summaryrefslogtreecommitdiffstats
path: root/src/journal-remote
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/journal-remote/browse.html547
-rw-r--r--src/journal-remote/journal-gatewayd.c1045
-rw-r--r--src/journal-remote/journal-remote-main.c1158
-rw-r--r--src/journal-remote/journal-remote-parse.c84
-rw-r--r--src/journal-remote/journal-remote-parse.h20
-rw-r--r--src/journal-remote/journal-remote-write.c105
-rw-r--r--src/journal-remote/journal-remote-write.h39
-rw-r--r--src/journal-remote/journal-remote.c533
-rw-r--r--src/journal-remote/journal-remote.conf.in19
-rw-r--r--src/journal-remote/journal-remote.h65
-rw-r--r--src/journal-remote/journal-upload-journal.c414
-rw-r--r--src/journal-remote/journal-upload.c854
-rw-r--r--src/journal-remote/journal-upload.conf.in18
-rw-r--r--src/journal-remote/journal-upload.h71
-rwxr-xr-xsrc/journal-remote/log-generator.py78
-rw-r--r--src/journal-remote/meson.build71
-rw-r--r--src/journal-remote/microhttpd-util.c312
-rw-r--r--src/journal-remote/microhttpd-util.h78
18 files changed, 5511 insertions, 0 deletions
diff --git a/src/journal-remote/browse.html b/src/journal-remote/browse.html
new file mode 100644
index 0000000..e5162d1
--- /dev/null
+++ b/src/journal-remote/browse.html
@@ -0,0 +1,547 @@
+<!DOCTYPE html>
+<html>
+<head>
+ <title>Journal</title>
+ <meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>
+ <style type="text/css">
+ div#divlogs, div#diventry {
+ font-family: monospace;
+ font-size: 7pt;
+ background-color: #ffffff;
+ padding: 1em;
+ margin: 2em 0em;
+ border-radius: 10px 10px 10px 10px;
+ border: 1px solid threedshadow;
+ white-space: nowrap;
+ overflow-x: scroll;
+ }
+ div#diventry {
+ display: none;
+ }
+ div#divlogs {
+ display: block;
+ }
+ body {
+ background-color: #ededed;
+ color: #313739;
+ font: message-box;
+ margin: 3em;
+ }
+ td.timestamp {
+ text-align: right;
+ border-right: 1px dotted lightgrey;
+ padding-right: 5px;
+ }
+ td.process {
+ border-right: 1px dotted lightgrey;
+ padding-left: 5px;
+ padding-right: 5px;
+ }
+ td.message {
+ padding-left: 5px;
+ }
+ td.message > a:link, td.message > a:visited {
+ text-decoration: none;
+ color: #313739;
+ }
+ td.message-error {
+ padding-left: 5px;
+ color: red;
+ font-weight: bold;
+ }
+ td.message-error > a:link, td.message-error > a:visited {
+ text-decoration: none;
+ color: red;
+ }
+ td.message-highlight {
+ padding-left: 5px;
+ font-weight: bold;
+ }
+ td.message-highlight > a:link, td.message-highlight > a:visited {
+ text-decoration: none;
+ color: #313739;
+ }
+ td > a:hover, td > a:active {
+ text-decoration: underline;
+ color: #c13739;
+ }
+ table#tablelogs, table#tableentry {
+ border-collapse: collapse;
+ }
+ td.field {
+ text-align: right;
+ border-right: 1px dotted lightgrey;
+ padding-right: 5px;
+ }
+ td.data {
+ padding-left: 5px;
+ }
+ div#keynav {
+ text-align: center;
+ font-size: 7pt;
+ color: #818789;
+ padding-top: 2em;
+ }
+ span.key {
+ font-weight: bold;
+ color: #313739;
+ }
+ div#buttonnav {
+ text-align: center;
+ }
+ button {
+ font-size: 18pt;
+ font-weight: bold;
+ width: 2em;
+ height: 2em;
+ }
+ div#filternav {
+ text-align: center;
+ }
+ select {
+ width: 50em;
+ }
+ </style>
+</head>
+
+<body>
+ <!-- TODO:
+ - live display
+ - show red lines for reboots -->
+
+ <h1 id="title"></h1>
+
+ <div id="os"></div>
+ <div id="virtualization"></div>
+ <div id="cutoff"></div>
+ <div id="machine"></div>
+ <div id="usage"></div>
+ <div id="showing"></div>
+
+ <div id="filternav">
+ <select id="filter" onchange="onFilterChange(this);" onfocus="onFilterFocus(this);">
+ <option>No filter</option>
+ </select>
+ &nbsp;&nbsp;&nbsp;&nbsp;
+ <input id="boot" type="checkbox" onchange="onBootChange(this);">Only current boot</input>
+ </div>
+
+ <div id="divlogs"><table id="tablelogs"></table></div>
+ <a name="entry"></a>
+ <div id="diventry"><table id="tableentry"></table></div>
+
+ <div id="buttonnav">
+ <button id="head" onclick="entriesLoadHead();" title="First Page">&#8676;</button>
+ <button id="previous" type="button" onclick="entriesLoadPrevious();" title="Previous Page"/>&#8592;</button>
+ <button id="next" type="button" onclick="entriesLoadNext();" title="Next Page"/>&#8594;</button>
+ <button id="tail" type="button" onclick="entriesLoadTail();" title="Last Page"/>&#8677;</button>
+ &nbsp;&nbsp;&nbsp;&nbsp;
+ <button id="more" type="button" onclick="entriesMore();" title="More Entries"/>+</button>
+ <button id="less" type="button" onclick="entriesLess();" title="Fewer Entries"/>-</button>
+ </div>
+
+ <div id="keynav">
+ <span class="key">g</span>: First Page &nbsp;&nbsp;&nbsp;&nbsp;
+ <span class="key">&#8592;, k, BACKSPACE</span>: Previous Page &nbsp;&nbsp;&nbsp;&nbsp;
+ <span class="key">&#8594;, j, SPACE</span>: Next Page &nbsp;&nbsp;&nbsp;&nbsp;
+ <span class="key">G</span>: Last Page &nbsp;&nbsp;&nbsp;&nbsp;
+ <span class="key">+</span>: More entries &nbsp;&nbsp;&nbsp;&nbsp;
+ <span class="key">-</span>: Fewer entries
+ </div>
+
+ <script type="text/javascript">
+ var first_cursor = null;
+ var last_cursor = null;
+
+ function getNEntries() {
+ var n;
+ n = localStorage["n_entries"];
+ if (n == null)
+ return 50;
+ n = parseInt(n);
+ if (n < 10)
+ return 10;
+ if (n > 1000)
+ return 1000;
+ return n;
+ }
+
+ function showNEntries(n) {
+ var showing = document.getElementById("showing");
+ showing.innerHTML = "Showing <b>" + n.toString() + "</b> entries.";
+ }
+
+ function setNEntries(n) {
+ if (n < 10)
+ return 10;
+ if (n > 1000)
+ return 1000;
+ localStorage["n_entries"] = n.toString();
+ showNEntries(n);
+ }
+
+ function machineLoad() {
+ var request = new XMLHttpRequest();
+ request.open("GET", "/machine");
+ request.onreadystatechange = machineOnResult;
+ request.setRequestHeader("Accept", "application/json");
+ request.send(null);
+ }
+
+ function formatBytes(u) {
+ if (u >= 1024*1024*1024*1024)
+ return (u/1024/1024/1024/1024).toFixed(1) + " TiB";
+ else if (u >= 1024*1024*1024)
+ return (u/1024/1024/1024).toFixed(1) + " GiB";
+ else if (u >= 1024*1024)
+ return (u/1024/1024).toFixed(1) + " MiB";
+ else if (u >= 1024)
+ return (u/1024).toFixed(1) + " KiB";
+ else
+ return u.toString() + " B";
+ }
+
+ function escapeHTML(s) {
+ return s.replace(/&/g, "&amp;").replace(/</g, "&lt;").replace(/>/g, "&gt;");
+ }
+
+ function machineOnResult(event) {
+ if ((event.currentTarget.readyState != 4) ||
+ (event.currentTarget.status != 200 && event.currentTarget.status != 0))
+ return;
+
+ var d = JSON.parse(event.currentTarget.responseText);
+
+ var title = document.getElementById("title");
+ title.innerHTML = 'Journal of ' + escapeHTML(d.hostname);
+ document.title = 'Journal of ' + escapeHTML(d.hostname);
+
+ var machine = document.getElementById("machine");
+ machine.innerHTML = 'Machine ID is <b>' + d.machine_id + '</b>, current boot ID is <b>' + d.boot_id + '</b>.';
+
+ var cutoff = document.getElementById("cutoff");
+ var from = new Date(parseInt(d.cutoff_from_realtime) / 1000);
+ var to = new Date(parseInt(d.cutoff_to_realtime) / 1000);
+ cutoff.innerHTML = 'Journal begins at <b>' + from.toLocaleString() + '</b> and ends at <b>' + to.toLocaleString() + '</b>.';
+
+ var usage = document.getElementById("usage");
+ usage.innerHTML = 'Disk usage is <b>' + formatBytes(parseInt(d.usage)) + '</b>.';
+
+ var os = document.getElementById("os");
+ os.innerHTML = 'Operating system is <b>' + escapeHTML(d.os_pretty_name) + '</b>.';
+
+ var virtualization = document.getElementById("virtualization");
+ virtualization.innerHTML = d.virtualization == "bare" ? "Running on <b>bare metal</b>." : "Running on virtualization <b>" + escapeHTML(d.virtualization) + "</b>.";
+ }
+
+ function entriesLoad(range) {
+
+ if (range == null) {
+ if (localStorage["cursor"] != null && localStorage["cursor"] != "")
+ range = localStorage["cursor"] + ":0";
+ else
+ range = "";
+ }
+
+ var url = "/entries";
+
+ if (localStorage["filter"] != "" && localStorage["filter"] != null) {
+ url += "?_SYSTEMD_UNIT=" + escape(localStorage["filter"]);
+
+ if (localStorage["boot"] == "1")
+ url += "&boot";
+ } else {
+ if (localStorage["boot"] == "1")
+ url += "?boot";
+ }
+
+ var request = new XMLHttpRequest();
+ request.open("GET", url);
+ request.onreadystatechange = entriesOnResult;
+ request.setRequestHeader("Accept", "application/json");
+ request.setRequestHeader("Range", "entries=" + range + ":" + getNEntries().toString());
+ request.send(null);
+ }
+
+ function entriesLoadNext() {
+ if (last_cursor == null)
+ entriesLoad("");
+ else
+ entriesLoad(last_cursor + ":1");
+ }
+
+ function entriesLoadPrevious() {
+ if (first_cursor == null)
+ entriesLoad("");
+ else
+ entriesLoad(first_cursor + ":-" + getNEntries().toString());
+ }
+
+ function entriesLoadHead() {
+ entriesLoad("");
+ }
+
+ function entriesLoadTail() {
+ entriesLoad(":-" + getNEntries().toString());
+ }
+
+ function entriesOnResult(event) {
+
+ if ((event.currentTarget.readyState != 4) ||
+ (event.currentTarget.status != 200 && event.currentTarget.status != 0))
+ return;
+
+ var logs = document.getElementById("tablelogs");
+
+ var lc = null;
+ var fc = null;
+
+ var i, l = event.currentTarget.responseText.split('\n');
+
+ if (l.length <= 1) {
+ logs.innerHTML = '<tbody><tr><td colspan="3"><i>No further entries...</i></td></tr></tbody>';
+ return;
+ }
+
+ var buf = '';
+
+ for (i in l) {
+ if (l[i] == '')
+ continue;
+
+ var d = JSON.parse(l[i]);
+ if (d.MESSAGE == undefined || d.__CURSOR == undefined)
+ continue;
+
+ if (fc == null)
+ fc = d.__CURSOR;
+ lc = d.__CURSOR;
+
+ var priority;
+ if (d.PRIORITY != undefined)
+ priority = parseInt(d.PRIORITY);
+ else
+ priority = 6;
+
+ var clazz;
+ if (priority <= 3)
+ clazz = "message-error";
+ else if (priority <= 5)
+ clazz = "message-highlight";
+ else
+ clazz = "message";
+
+ buf += '<tr><td class="timestamp">';
+
+ if (d.__REALTIME_TIMESTAMP != undefined) {
+ var timestamp = new Date(parseInt(d.__REALTIME_TIMESTAMP) / 1000);
+ buf += timestamp.toLocaleString();
+ }
+
+ buf += '</td><td class="process">';
+
+ if (d.SYSLOG_IDENTIFIER != undefined)
+ buf += escapeHTML(d.SYSLOG_IDENTIFIER);
+ else if (d._COMM != undefined)
+ buf += escapeHTML(d._COMM);
+
+ if (d._PID != undefined)
+ buf += "[" + escapeHTML(d._PID) + "]";
+ else if (d.SYSLOG_PID != undefined)
+ buf += "[" + escapeHTML(d.SYSLOG_PID) + "]";
+
+ buf += '</td><td class="' + clazz + '"><a href="#entry" onclick="onMessageClick(\'' + d.__CURSOR + '\');">';
+
+ if (d.MESSAGE == null)
+ buf += "[blob data]";
+ else if (d.MESSAGE instanceof Array)
+ buf += "[" + formatBytes(d.MESSAGE.length) + " blob data]";
+ else
+ buf += escapeHTML(d.MESSAGE);
+
+ buf += '</a></td></tr>';
+ }
+
+ logs.innerHTML = '<tbody>' + buf + '</tbody>';
+
+ if (fc != null) {
+ first_cursor = fc;
+ localStorage["cursor"] = fc;
+ }
+ if (lc != null)
+ last_cursor = lc;
+ }
+
+ function entriesMore() {
+ setNEntries(getNEntries() + 10);
+ entriesLoad(first_cursor);
+ }
+
+ function entriesLess() {
+ setNEntries(getNEntries() - 10);
+ entriesLoad(first_cursor);
+ }
+
+ function onResultMessageClick(event) {
+ if ((event.currentTarget.readyState != 4) ||
+ (event.currentTarget.status != 200 && event.currentTarget.status != 0))
+ return;
+
+ var d = JSON.parse(event.currentTarget.responseText);
+
+ document.getElementById("diventry").style.display = "block";
+ var entry = document.getElementById("tableentry");
+
+ var buf = "";
+ for (var key in d) {
+ var data = d[key];
+
+ if (data == null)
+ data = "[blob data]";
+ else if (data instanceof Array)
+ data = "[" + formatBytes(data.length) + " blob data]";
+ else
+ data = escapeHTML(data);
+
+ buf += '<tr><td class="field">' + key + '</td><td class="data">' + data + '</td></tr>';
+ }
+ entry.innerHTML = '<tbody>' + buf + '</tbody>';
+ }
+
+ function onMessageClick(t) {
+ var request = new XMLHttpRequest();
+ request.open("GET", "/entries?discrete");
+ request.onreadystatechange = onResultMessageClick;
+ request.setRequestHeader("Accept", "application/json");
+ request.setRequestHeader("Range", "entries=" + t + ":0:1");
+ request.send(null);
+ }
+
+ function onKeyUp(event) {
+ switch (event.keyCode) {
+ case 8:
+ case 37:
+ case 75:
+ entriesLoadPrevious();
+ break;
+ case 32:
+ case 39:
+ case 74:
+ entriesLoadNext();
+ break;
+
+ case 71:
+ if (event.shiftKey)
+ entriesLoadTail();
+ else
+ entriesLoadHead();
+ break;
+ case 171:
+ entriesMore();
+ break;
+ case 173:
+ entriesLess();
+ break;
+ }
+ }
+
+ function onMouseWheel(event) {
+ if (event.detail < 0 || event.wheelDelta > 0)
+ entriesLoadPrevious();
+ else
+ entriesLoadNext();
+ }
+
+ function onResultFilterFocus(event) {
+ if ((event.currentTarget.readyState != 4) ||
+ (event.currentTarget.status != 200 && event.currentTarget.status != 0))
+ return;
+
+ var f = document.getElementById("filter");
+
+ var l = event.currentTarget.responseText.split('\n');
+ var buf = '<option>No filter</option>';
+ var j = -1;
+
+ for (i in l) {
+
+ if (l[i] == '')
+ continue;
+
+ var d = JSON.parse(l[i]);
+ if (d._SYSTEMD_UNIT == undefined)
+ continue;
+
+ buf += '<option value="' + escape(d._SYSTEMD_UNIT) + '">' + escapeHTML(d._SYSTEMD_UNIT) + '</option>';
+
+ if (d._SYSTEMD_UNIT == localStorage["filter"])
+ j = i;
+ }
+
+ if (j < 0) {
+ if (localStorage["filter"] != null && localStorage["filter"] != "") {
+ buf += '<option value="' + escape(localStorage["filter"]) + '">' + escapeHTML(localStorage["filter"]) + '</option>';
+ j = i + 1;
+ } else
+ j = 0;
+ }
+
+ f.innerHTML = buf;
+ f.selectedIndex = j;
+ }
+
+ function onFilterFocus(w) {
+ var request = new XMLHttpRequest();
+ request.open("GET", "/fields/_SYSTEMD_UNIT");
+ request.onreadystatechange = onResultFilterFocus;
+ request.setRequestHeader("Accept", "application/json");
+ request.send(null);
+ }
+
+ function onFilterChange(w) {
+ if (w.selectedIndex <= 0)
+ localStorage["filter"] = "";
+ else
+ localStorage["filter"] = unescape(w.options[w.selectedIndex].value);
+
+ entriesLoadHead();
+ }
+
+ function onBootChange(w) {
+ localStorage["boot"] = w.checked ? "1" : "0";
+ entriesLoadHead();
+ }
+
+ function initFilter() {
+ var f = document.getElementById("filter");
+
+ var buf = '<option>No filter</option>';
+
+ var filter = localStorage["filter"];
+ var j;
+ if (filter != null && filter != "") {
+ buf += '<option value="' + escape(filter) + '">' + escapeHTML(filter) + '</option>';
+ j = 1;
+ } else
+ j = 0;
+
+ f.innerHTML = buf;
+ f.selectedIndex = j;
+ }
+
+ function installHandlers() {
+ document.onkeyup = onKeyUp;
+
+ var logs = document.getElementById("divlogs");
+ logs.addEventListener("mousewheel", onMouseWheel, false);
+ logs.addEventListener("DOMMouseScroll", onMouseWheel, false);
+ }
+
+ machineLoad();
+ entriesLoad(null);
+ showNEntries(getNEntries());
+ initFilter();
+ installHandlers();
+ </script>
+</body>
+</html>
diff --git a/src/journal-remote/journal-gatewayd.c b/src/journal-remote/journal-gatewayd.c
new file mode 100644
index 0000000..af45fa5
--- /dev/null
+++ b/src/journal-remote/journal-gatewayd.c
@@ -0,0 +1,1045 @@
+/* SPDX-License-Identifier: LGPL-2.1+ */
+
+#include <fcntl.h>
+#include <getopt.h>
+#include <microhttpd.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+
+#include "sd-bus.h"
+#include "sd-daemon.h"
+#include "sd-journal.h"
+
+#include "alloc-util.h"
+#include "bus-util.h"
+#include "fd-util.h"
+#include "fileio.h"
+#include "hostname-util.h"
+#include "log.h"
+#include "logs-show.h"
+#include "main-func.h"
+#include "microhttpd-util.h"
+#include "os-util.h"
+#include "parse-util.h"
+#include "pretty-print.h"
+#include "sigbus.h"
+#include "tmpfile-util.h"
+#include "util.h"
+
+#define JOURNAL_WAIT_TIMEOUT (10*USEC_PER_SEC)
+
+static char *arg_key_pem = NULL;
+static char *arg_cert_pem = NULL;
+static char *arg_trust_pem = NULL;
+static const char *arg_directory = NULL;
+
+STATIC_DESTRUCTOR_REGISTER(arg_key_pem, freep);
+STATIC_DESTRUCTOR_REGISTER(arg_cert_pem, freep);
+STATIC_DESTRUCTOR_REGISTER(arg_trust_pem, freep);
+
+typedef struct RequestMeta {
+ sd_journal *journal;
+
+ OutputMode mode;
+
+ char *cursor;
+ int64_t n_skip;
+ uint64_t n_entries;
+ bool n_entries_set;
+
+ FILE *tmp;
+ uint64_t delta, size;
+
+ int argument_parse_error;
+
+ bool follow;
+ bool discrete;
+
+ uint64_t n_fields;
+ bool n_fields_set;
+} RequestMeta;
+
+static const char* const mime_types[_OUTPUT_MODE_MAX] = {
+ [OUTPUT_SHORT] = "text/plain",
+ [OUTPUT_JSON] = "application/json",
+ [OUTPUT_JSON_SSE] = "text/event-stream",
+ [OUTPUT_JSON_SEQ] = "application/json-seq",
+ [OUTPUT_EXPORT] = "application/vnd.fdo.journal",
+};
+
+static RequestMeta *request_meta(void **connection_cls) {
+ RequestMeta *m;
+
+ assert(connection_cls);
+ if (*connection_cls)
+ return *connection_cls;
+
+ m = new0(RequestMeta, 1);
+ if (!m)
+ return NULL;
+
+ *connection_cls = m;
+ return m;
+}
+
+static void request_meta_free(
+ void *cls,
+ struct MHD_Connection *connection,
+ void **connection_cls,
+ enum MHD_RequestTerminationCode toe) {
+
+ RequestMeta *m = *connection_cls;
+
+ if (!m)
+ return;
+
+ sd_journal_close(m->journal);
+
+ safe_fclose(m->tmp);
+
+ free(m->cursor);
+ free(m);
+}
+
+static int open_journal(RequestMeta *m) {
+ assert(m);
+
+ if (m->journal)
+ return 0;
+
+ if (arg_directory)
+ return sd_journal_open_directory(&m->journal, arg_directory, 0);
+ else
+ return sd_journal_open(&m->journal, SD_JOURNAL_LOCAL_ONLY|SD_JOURNAL_SYSTEM);
+}
+
+static int request_meta_ensure_tmp(RequestMeta *m) {
+ assert(m);
+
+ if (m->tmp)
+ rewind(m->tmp);
+ else {
+ int fd;
+
+ fd = open_tmpfile_unlinkable("/tmp", O_RDWR|O_CLOEXEC);
+ if (fd < 0)
+ return fd;
+
+ m->tmp = fdopen(fd, "w+");
+ if (!m->tmp) {
+ safe_close(fd);
+ return -errno;
+ }
+ }
+
+ return 0;
+}
+
+static ssize_t request_reader_entries(
+ void *cls,
+ uint64_t pos,
+ char *buf,
+ size_t max) {
+
+ RequestMeta *m = cls;
+ int r;
+ size_t n, k;
+
+ assert(m);
+ assert(buf);
+ assert(max > 0);
+ assert(pos >= m->delta);
+
+ pos -= m->delta;
+
+ while (pos >= m->size) {
+ off_t sz;
+
+ /* End of this entry, so let's serialize the next
+ * one */
+
+ if (m->n_entries_set &&
+ m->n_entries <= 0)
+ return MHD_CONTENT_READER_END_OF_STREAM;
+
+ if (m->n_skip < 0)
+ r = sd_journal_previous_skip(m->journal, (uint64_t) -m->n_skip + 1);
+ else if (m->n_skip > 0)
+ r = sd_journal_next_skip(m->journal, (uint64_t) m->n_skip + 1);
+ else
+ r = sd_journal_next(m->journal);
+
+ if (r < 0) {
+ log_error_errno(r, "Failed to advance journal pointer: %m");
+ return MHD_CONTENT_READER_END_WITH_ERROR;
+ } else if (r == 0) {
+
+ if (m->follow) {
+ r = sd_journal_wait(m->journal, (uint64_t) JOURNAL_WAIT_TIMEOUT);
+ if (r < 0) {
+ log_error_errno(r, "Couldn't wait for journal event: %m");
+ return MHD_CONTENT_READER_END_WITH_ERROR;
+ }
+ if (r == SD_JOURNAL_NOP)
+ break;
+
+ continue;
+ }
+
+ return MHD_CONTENT_READER_END_OF_STREAM;
+ }
+
+ if (m->discrete) {
+ assert(m->cursor);
+
+ r = sd_journal_test_cursor(m->journal, m->cursor);
+ if (r < 0) {
+ log_error_errno(r, "Failed to test cursor: %m");
+ return MHD_CONTENT_READER_END_WITH_ERROR;
+ }
+
+ if (r == 0)
+ return MHD_CONTENT_READER_END_OF_STREAM;
+ }
+
+ pos -= m->size;
+ m->delta += m->size;
+
+ if (m->n_entries_set)
+ m->n_entries -= 1;
+
+ m->n_skip = 0;
+
+ r = request_meta_ensure_tmp(m);
+ if (r < 0) {
+ log_error_errno(r, "Failed to create temporary file: %m");
+ return MHD_CONTENT_READER_END_WITH_ERROR;
+ }
+
+ r = show_journal_entry(m->tmp, m->journal, m->mode, 0, OUTPUT_FULL_WIDTH,
+ NULL, NULL, NULL);
+ if (r < 0) {
+ log_error_errno(r, "Failed to serialize item: %m");
+ return MHD_CONTENT_READER_END_WITH_ERROR;
+ }
+
+ sz = ftello(m->tmp);
+ if (sz == (off_t) -1) {
+ log_error_errno(errno, "Failed to retrieve file position: %m");
+ return MHD_CONTENT_READER_END_WITH_ERROR;
+ }
+
+ m->size = (uint64_t) sz;
+ }
+
+ if (m->tmp == NULL && m->follow)
+ return 0;
+
+ if (fseeko(m->tmp, pos, SEEK_SET) < 0) {
+ log_error_errno(errno, "Failed to seek to position: %m");
+ return MHD_CONTENT_READER_END_WITH_ERROR;
+ }
+
+ n = m->size - pos;
+ if (n < 1)
+ return 0;
+ if (n > max)
+ n = max;
+
+ errno = 0;
+ k = fread(buf, 1, n, m->tmp);
+ if (k != n) {
+ log_error("Failed to read from file: %s", errno ? strerror(errno) : "Premature EOF");
+ return MHD_CONTENT_READER_END_WITH_ERROR;
+ }
+
+ return (ssize_t) k;
+}
+
+static int request_parse_accept(
+ RequestMeta *m,
+ struct MHD_Connection *connection) {
+
+ const char *header;
+
+ assert(m);
+ assert(connection);
+
+ header = MHD_lookup_connection_value(connection, MHD_HEADER_KIND, "Accept");
+ if (!header)
+ return 0;
+
+ if (streq(header, mime_types[OUTPUT_JSON]))
+ m->mode = OUTPUT_JSON;
+ else if (streq(header, mime_types[OUTPUT_JSON_SSE]))
+ m->mode = OUTPUT_JSON_SSE;
+ else if (streq(header, mime_types[OUTPUT_JSON_SEQ]))
+ m->mode = OUTPUT_JSON_SEQ;
+ else if (streq(header, mime_types[OUTPUT_EXPORT]))
+ m->mode = OUTPUT_EXPORT;
+ else
+ m->mode = OUTPUT_SHORT;
+
+ return 0;
+}
+
+static int request_parse_range(
+ RequestMeta *m,
+ struct MHD_Connection *connection) {
+
+ const char *range, *colon, *colon2;
+ int r;
+
+ assert(m);
+ assert(connection);
+
+ range = MHD_lookup_connection_value(connection, MHD_HEADER_KIND, "Range");
+ if (!range)
+ return 0;
+
+ if (!startswith(range, "entries="))
+ return 0;
+
+ range += 8;
+ range += strspn(range, WHITESPACE);
+
+ colon = strchr(range, ':');
+ if (!colon)
+ m->cursor = strdup(range);
+ else {
+ const char *p;
+
+ colon2 = strchr(colon + 1, ':');
+ if (colon2) {
+ _cleanup_free_ char *t;
+
+ t = strndup(colon + 1, colon2 - colon - 1);
+ if (!t)
+ return -ENOMEM;
+
+ r = safe_atoi64(t, &m->n_skip);
+ if (r < 0)
+ return r;
+ }
+
+ p = (colon2 ? colon2 : colon) + 1;
+ if (*p) {
+ r = safe_atou64(p, &m->n_entries);
+ if (r < 0)
+ return r;
+
+ if (m->n_entries <= 0)
+ return -EINVAL;
+
+ m->n_entries_set = true;
+ }
+
+ m->cursor = strndup(range, colon - range);
+ }
+
+ if (!m->cursor)
+ return -ENOMEM;
+
+ m->cursor[strcspn(m->cursor, WHITESPACE)] = 0;
+ if (isempty(m->cursor))
+ m->cursor = mfree(m->cursor);
+
+ return 0;
+}
+
+static int request_parse_arguments_iterator(
+ void *cls,
+ enum MHD_ValueKind kind,
+ const char *key,
+ const char *value) {
+
+ RequestMeta *m = cls;
+ _cleanup_free_ char *p = NULL;
+ int r;
+
+ assert(m);
+
+ if (isempty(key)) {
+ m->argument_parse_error = -EINVAL;
+ return MHD_NO;
+ }
+
+ if (streq(key, "follow")) {
+ if (isempty(value)) {
+ m->follow = true;
+ return MHD_YES;
+ }
+
+ r = parse_boolean(value);
+ if (r < 0) {
+ m->argument_parse_error = r;
+ return MHD_NO;
+ }
+
+ m->follow = r;
+ return MHD_YES;
+ }
+
+ if (streq(key, "discrete")) {
+ if (isempty(value)) {
+ m->discrete = true;
+ return MHD_YES;
+ }
+
+ r = parse_boolean(value);
+ if (r < 0) {
+ m->argument_parse_error = r;
+ return MHD_NO;
+ }
+
+ m->discrete = r;
+ return MHD_YES;
+ }
+
+ if (streq(key, "boot")) {
+ if (isempty(value))
+ r = true;
+ else {
+ r = parse_boolean(value);
+ if (r < 0) {
+ m->argument_parse_error = r;
+ return MHD_NO;
+ }
+ }
+
+ if (r) {
+ char match[9 + 32 + 1] = "_BOOT_ID=";
+ sd_id128_t bid;
+
+ r = sd_id128_get_boot(&bid);
+ if (r < 0) {
+ log_error_errno(r, "Failed to get boot ID: %m");
+ return MHD_NO;
+ }
+
+ sd_id128_to_string(bid, match + 9);
+ r = sd_journal_add_match(m->journal, match, sizeof(match)-1);
+ if (r < 0) {
+ m->argument_parse_error = r;
+ return MHD_NO;
+ }
+ }
+
+ return MHD_YES;
+ }
+
+ p = strjoin(key, "=", strempty(value));
+ if (!p) {
+ m->argument_parse_error = log_oom();
+ return MHD_NO;
+ }
+
+ r = sd_journal_add_match(m->journal, p, 0);
+ if (r < 0) {
+ m->argument_parse_error = r;
+ return MHD_NO;
+ }
+
+ return MHD_YES;
+}
+
+static int request_parse_arguments(
+ RequestMeta *m,
+ struct MHD_Connection *connection) {
+
+ assert(m);
+ assert(connection);
+
+ m->argument_parse_error = 0;
+ MHD_get_connection_values(connection, MHD_GET_ARGUMENT_KIND, request_parse_arguments_iterator, m);
+
+ return m->argument_parse_error;
+}
+
+static int request_handler_entries(
+ struct MHD_Connection *connection,
+ void *connection_cls) {
+
+ _cleanup_(MHD_destroy_responsep) struct MHD_Response *response = NULL;
+ RequestMeta *m = connection_cls;
+ int r;
+
+ assert(connection);
+ assert(m);
+
+ r = open_journal(m);
+ if (r < 0)
+ return mhd_respondf(connection, r, MHD_HTTP_INTERNAL_SERVER_ERROR, "Failed to open journal: %m");
+
+ if (request_parse_accept(m, connection) < 0)
+ return mhd_respond(connection, MHD_HTTP_BAD_REQUEST, "Failed to parse Accept header.");
+
+ if (request_parse_range(m, connection) < 0)
+ return mhd_respond(connection, MHD_HTTP_BAD_REQUEST, "Failed to parse Range header.");
+
+ if (request_parse_arguments(m, connection) < 0)
+ return mhd_respond(connection, MHD_HTTP_BAD_REQUEST, "Failed to parse URL arguments.");
+
+ if (m->discrete) {
+ if (!m->cursor)
+ return mhd_respond(connection, MHD_HTTP_BAD_REQUEST, "Discrete seeks require a cursor specification.");
+
+ m->n_entries = 1;
+ m->n_entries_set = true;
+ }
+
+ if (m->cursor)
+ r = sd_journal_seek_cursor(m->journal, m->cursor);
+ else if (m->n_skip >= 0)
+ r = sd_journal_seek_head(m->journal);
+ else if (m->n_skip < 0)
+ r = sd_journal_seek_tail(m->journal);
+ if (r < 0)
+ return mhd_respond(connection, MHD_HTTP_BAD_REQUEST, "Failed to seek in journal.");
+
+ response = MHD_create_response_from_callback(MHD_SIZE_UNKNOWN, 4*1024, request_reader_entries, m, NULL);
+ if (!response)
+ return respond_oom(connection);
+
+ MHD_add_response_header(response, "Content-Type", mime_types[m->mode]);
+ return MHD_queue_response(connection, MHD_HTTP_OK, response);
+}
+
+static int output_field(FILE *f, OutputMode m, const char *d, size_t l) {
+ const char *eq;
+ size_t j;
+
+ eq = memchr(d, '=', l);
+ if (!eq)
+ return -EINVAL;
+
+ j = l - (eq - d + 1);
+
+ if (m == OUTPUT_JSON) {
+ fprintf(f, "{ \"%.*s\" : ", (int) (eq - d), d);
+ json_escape(f, eq+1, j, OUTPUT_FULL_WIDTH);
+ fputs(" }\n", f);
+ } else {
+ fwrite(eq+1, 1, j, f);
+ fputc('\n', f);
+ }
+
+ return 0;
+}
+
+static ssize_t request_reader_fields(
+ void *cls,
+ uint64_t pos,
+ char *buf,
+ size_t max) {
+
+ RequestMeta *m = cls;
+ int r;
+ size_t n, k;
+
+ assert(m);
+ assert(buf);
+ assert(max > 0);
+ assert(pos >= m->delta);
+
+ pos -= m->delta;
+
+ while (pos >= m->size) {
+ off_t sz;
+ const void *d;
+ size_t l;
+
+ /* End of this field, so let's serialize the next
+ * one */
+
+ if (m->n_fields_set &&
+ m->n_fields <= 0)
+ return MHD_CONTENT_READER_END_OF_STREAM;
+
+ r = sd_journal_enumerate_unique(m->journal, &d, &l);
+ if (r < 0) {
+ log_error_errno(r, "Failed to advance field index: %m");
+ return MHD_CONTENT_READER_END_WITH_ERROR;
+ } else if (r == 0)
+ return MHD_CONTENT_READER_END_OF_STREAM;
+
+ pos -= m->size;
+ m->delta += m->size;
+
+ if (m->n_fields_set)
+ m->n_fields -= 1;
+
+ r = request_meta_ensure_tmp(m);
+ if (r < 0) {
+ log_error_errno(r, "Failed to create temporary file: %m");
+ return MHD_CONTENT_READER_END_WITH_ERROR;
+ }
+
+ r = output_field(m->tmp, m->mode, d, l);
+ if (r < 0) {
+ log_error_errno(r, "Failed to serialize item: %m");
+ return MHD_CONTENT_READER_END_WITH_ERROR;
+ }
+
+ sz = ftello(m->tmp);
+ if (sz == (off_t) -1) {
+ log_error_errno(errno, "Failed to retrieve file position: %m");
+ return MHD_CONTENT_READER_END_WITH_ERROR;
+ }
+
+ m->size = (uint64_t) sz;
+ }
+
+ if (fseeko(m->tmp, pos, SEEK_SET) < 0) {
+ log_error_errno(errno, "Failed to seek to position: %m");
+ return MHD_CONTENT_READER_END_WITH_ERROR;
+ }
+
+ n = m->size - pos;
+ if (n > max)
+ n = max;
+
+ errno = 0;
+ k = fread(buf, 1, n, m->tmp);
+ if (k != n) {
+ log_error("Failed to read from file: %s", errno ? strerror(errno) : "Premature EOF");
+ return MHD_CONTENT_READER_END_WITH_ERROR;
+ }
+
+ return (ssize_t) k;
+}
+
+static int request_handler_fields(
+ struct MHD_Connection *connection,
+ const char *field,
+ void *connection_cls) {
+
+ _cleanup_(MHD_destroy_responsep) struct MHD_Response *response = NULL;
+ RequestMeta *m = connection_cls;
+ int r;
+
+ assert(connection);
+ assert(m);
+
+ r = open_journal(m);
+ if (r < 0)
+ return mhd_respondf(connection, r, MHD_HTTP_INTERNAL_SERVER_ERROR, "Failed to open journal: %m");
+
+ if (request_parse_accept(m, connection) < 0)
+ return mhd_respond(connection, MHD_HTTP_BAD_REQUEST, "Failed to parse Accept header.");
+
+ r = sd_journal_query_unique(m->journal, field);
+ if (r < 0)
+ return mhd_respond(connection, MHD_HTTP_BAD_REQUEST, "Failed to query unique fields.");
+
+ response = MHD_create_response_from_callback(MHD_SIZE_UNKNOWN, 4*1024, request_reader_fields, m, NULL);
+ if (!response)
+ return respond_oom(connection);
+
+ MHD_add_response_header(response, "Content-Type", mime_types[m->mode == OUTPUT_JSON ? OUTPUT_JSON : OUTPUT_SHORT]);
+ return MHD_queue_response(connection, MHD_HTTP_OK, response);
+}
+
+static int request_handler_redirect(
+ struct MHD_Connection *connection,
+ const char *target) {
+
+ char *page;
+ _cleanup_(MHD_destroy_responsep) struct MHD_Response *response = NULL;
+
+ assert(connection);
+ assert(target);
+
+ if (asprintf(&page, "<html><body>Please continue to the <a href=\"%s\">journal browser</a>.</body></html>", target) < 0)
+ return respond_oom(connection);
+
+ response = MHD_create_response_from_buffer(strlen(page), page, MHD_RESPMEM_MUST_FREE);
+ if (!response) {
+ free(page);
+ return respond_oom(connection);
+ }
+
+ MHD_add_response_header(response, "Content-Type", "text/html");
+ MHD_add_response_header(response, "Location", target);
+ return MHD_queue_response(connection, MHD_HTTP_MOVED_PERMANENTLY, response);
+}
+
+static int request_handler_file(
+ struct MHD_Connection *connection,
+ const char *path,
+ const char *mime_type) {
+
+ _cleanup_(MHD_destroy_responsep) struct MHD_Response *response = NULL;
+ _cleanup_close_ int fd = -1;
+ struct stat st;
+
+ assert(connection);
+ assert(path);
+ assert(mime_type);
+
+ fd = open(path, O_RDONLY|O_CLOEXEC);
+ if (fd < 0)
+ return mhd_respondf(connection, errno, MHD_HTTP_NOT_FOUND, "Failed to open file %s: %m", path);
+
+ if (fstat(fd, &st) < 0)
+ return mhd_respondf(connection, errno, MHD_HTTP_INTERNAL_SERVER_ERROR, "Failed to stat file: %m");
+
+ response = MHD_create_response_from_fd_at_offset64(st.st_size, fd, 0);
+ if (!response)
+ return respond_oom(connection);
+ TAKE_FD(fd);
+
+ MHD_add_response_header(response, "Content-Type", mime_type);
+ return MHD_queue_response(connection, MHD_HTTP_OK, response);
+}
+
+static int get_virtualization(char **v) {
+ _cleanup_(sd_bus_unrefp) sd_bus *bus = NULL;
+ char *b = NULL;
+ int r;
+
+ r = sd_bus_default_system(&bus);
+ if (r < 0)
+ return r;
+
+ r = sd_bus_get_property_string(
+ bus,
+ "org.freedesktop.systemd1",
+ "/org/freedesktop/systemd1",
+ "org.freedesktop.systemd1.Manager",
+ "Virtualization",
+ NULL,
+ &b);
+ if (r < 0)
+ return r;
+
+ if (isempty(b)) {
+ free(b);
+ *v = NULL;
+ return 0;
+ }
+
+ *v = b;
+ return 1;
+}
+
+static int request_handler_machine(
+ struct MHD_Connection *connection,
+ void *connection_cls) {
+
+ _cleanup_(MHD_destroy_responsep) struct MHD_Response *response = NULL;
+ RequestMeta *m = connection_cls;
+ int r;
+ _cleanup_free_ char* hostname = NULL, *os_name = NULL;
+ uint64_t cutoff_from = 0, cutoff_to = 0, usage = 0;
+ sd_id128_t mid, bid;
+ _cleanup_free_ char *v = NULL, *json = NULL;
+
+ assert(connection);
+ assert(m);
+
+ r = open_journal(m);
+ if (r < 0)
+ return mhd_respondf(connection, r, MHD_HTTP_INTERNAL_SERVER_ERROR, "Failed to open journal: %m");
+
+ r = sd_id128_get_machine(&mid);
+ if (r < 0)
+ return mhd_respondf(connection, r, MHD_HTTP_INTERNAL_SERVER_ERROR, "Failed to determine machine ID: %m");
+
+ r = sd_id128_get_boot(&bid);
+ if (r < 0)
+ return mhd_respondf(connection, r, MHD_HTTP_INTERNAL_SERVER_ERROR, "Failed to determine boot ID: %m");
+
+ hostname = gethostname_malloc();
+ if (!hostname)
+ return respond_oom(connection);
+
+ r = sd_journal_get_usage(m->journal, &usage);
+ if (r < 0)
+ return mhd_respondf(connection, r, MHD_HTTP_INTERNAL_SERVER_ERROR, "Failed to determine disk usage: %m");
+
+ r = sd_journal_get_cutoff_realtime_usec(m->journal, &cutoff_from, &cutoff_to);
+ if (r < 0)
+ return mhd_respondf(connection, r, MHD_HTTP_INTERNAL_SERVER_ERROR, "Failed to determine disk usage: %m");
+
+ (void) parse_os_release(NULL, "PRETTY_NAME", &os_name, NULL);
+ (void) get_virtualization(&v);
+
+ r = asprintf(&json,
+ "{ \"machine_id\" : \"" SD_ID128_FORMAT_STR "\","
+ "\"boot_id\" : \"" SD_ID128_FORMAT_STR "\","
+ "\"hostname\" : \"%s\","
+ "\"os_pretty_name\" : \"%s\","
+ "\"virtualization\" : \"%s\","
+ "\"usage\" : \"%"PRIu64"\","
+ "\"cutoff_from_realtime\" : \"%"PRIu64"\","
+ "\"cutoff_to_realtime\" : \"%"PRIu64"\" }\n",
+ SD_ID128_FORMAT_VAL(mid),
+ SD_ID128_FORMAT_VAL(bid),
+ hostname_cleanup(hostname),
+ os_name ? os_name : "Linux",
+ v ? v : "bare",
+ usage,
+ cutoff_from,
+ cutoff_to);
+ if (r < 0)
+ return respond_oom(connection);
+
+ response = MHD_create_response_from_buffer(strlen(json), json, MHD_RESPMEM_MUST_FREE);
+ if (!response)
+ return respond_oom(connection);
+ TAKE_PTR(json);
+
+ MHD_add_response_header(response, "Content-Type", "application/json");
+ return MHD_queue_response(connection, MHD_HTTP_OK, response);
+}
+
+static int request_handler(
+ void *cls,
+ struct MHD_Connection *connection,
+ const char *url,
+ const char *method,
+ const char *version,
+ const char *upload_data,
+ size_t *upload_data_size,
+ void **connection_cls) {
+ int r, code;
+
+ assert(connection);
+ assert(connection_cls);
+ assert(url);
+ assert(method);
+
+ if (!streq(method, "GET"))
+ return mhd_respond(connection, MHD_HTTP_NOT_ACCEPTABLE, "Unsupported method.");
+
+ if (!*connection_cls) {
+ if (!request_meta(connection_cls))
+ return respond_oom(connection);
+ return MHD_YES;
+ }
+
+ if (arg_trust_pem) {
+ r = check_permissions(connection, &code, NULL);
+ if (r < 0)
+ return code;
+ }
+
+ if (streq(url, "/"))
+ return request_handler_redirect(connection, "/browse");
+
+ if (streq(url, "/entries"))
+ return request_handler_entries(connection, *connection_cls);
+
+ if (startswith(url, "/fields/"))
+ return request_handler_fields(connection, url + 8, *connection_cls);
+
+ if (streq(url, "/browse"))
+ return request_handler_file(connection, DOCUMENT_ROOT "/browse.html", "text/html");
+
+ if (streq(url, "/machine"))
+ return request_handler_machine(connection, *connection_cls);
+
+ return mhd_respond(connection, MHD_HTTP_NOT_FOUND, "Not found.");
+}
+
+static int help(void) {
+ _cleanup_free_ char *link = NULL;
+ int r;
+
+ r = terminal_urlify_man("systemd-journal-gatewayd.service", "8", &link);
+ if (r < 0)
+ return log_oom();
+
+ printf("%s [OPTIONS...] ...\n\n"
+ "HTTP server for journal events.\n\n"
+ " -h --help Show this help\n"
+ " --version Show package version\n"
+ " --cert=CERT.PEM Server certificate in PEM format\n"
+ " --key=KEY.PEM Server key in PEM format\n"
+ " --trust=CERT.PEM Certificate authority certificate in PEM format\n"
+ " -D --directory=PATH Serve journal files in directory\n"
+ "\nSee the %s for details.\n"
+ , program_invocation_short_name
+ , link
+ );
+
+ return 0;
+}
+
+static int parse_argv(int argc, char *argv[]) {
+ enum {
+ ARG_VERSION = 0x100,
+ ARG_KEY,
+ ARG_CERT,
+ ARG_TRUST,
+ };
+
+ int r, c;
+
+ static const struct option options[] = {
+ { "help", no_argument, NULL, 'h' },
+ { "version", no_argument, NULL, ARG_VERSION },
+ { "key", required_argument, NULL, ARG_KEY },
+ { "cert", required_argument, NULL, ARG_CERT },
+ { "trust", required_argument, NULL, ARG_TRUST },
+ { "directory", required_argument, NULL, 'D' },
+ {}
+ };
+
+ assert(argc >= 0);
+ assert(argv);
+
+ while ((c = getopt_long(argc, argv, "hD:", options, NULL)) >= 0)
+
+ switch(c) {
+
+ case 'h':
+ return help();
+
+ case ARG_VERSION:
+ return version();
+
+ case ARG_KEY:
+ if (arg_key_pem)
+ return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
+ "Key file specified twice");
+ r = read_full_file(optarg, &arg_key_pem, NULL);
+ if (r < 0)
+ return log_error_errno(r, "Failed to read key file: %m");
+ assert(arg_key_pem);
+ break;
+
+ case ARG_CERT:
+ if (arg_cert_pem)
+ return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
+ "Certificate file specified twice");
+ r = read_full_file(optarg, &arg_cert_pem, NULL);
+ if (r < 0)
+ return log_error_errno(r, "Failed to read certificate file: %m");
+ assert(arg_cert_pem);
+ break;
+
+ case ARG_TRUST:
+#if HAVE_GNUTLS
+ if (arg_trust_pem)
+ return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
+ "CA certificate file specified twice");
+ r = read_full_file(optarg, &arg_trust_pem, NULL);
+ if (r < 0)
+ return log_error_errno(r, "Failed to read CA certificate file: %m");
+ assert(arg_trust_pem);
+ break;
+#else
+ return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
+ "Option --trust is not available.");
+#endif
+ case 'D':
+ arg_directory = optarg;
+ break;
+
+ case '?':
+ return -EINVAL;
+
+ default:
+ assert_not_reached("Unhandled option");
+ }
+
+ if (optind < argc)
+ return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
+ "This program does not take arguments.");
+
+ if (!!arg_key_pem != !!arg_cert_pem)
+ return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
+ "Certificate and key files must be specified together");
+
+ if (arg_trust_pem && !arg_key_pem)
+ return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
+ "CA certificate can only be used with certificate file");
+
+ return 1;
+}
+
+static int run(int argc, char *argv[]) {
+ _cleanup_(MHD_stop_daemonp) struct MHD_Daemon *d = NULL;
+ struct MHD_OptionItem opts[] = {
+ { MHD_OPTION_NOTIFY_COMPLETED,
+ (intptr_t) request_meta_free, NULL },
+ { MHD_OPTION_EXTERNAL_LOGGER,
+ (intptr_t) microhttpd_logger, NULL },
+ { MHD_OPTION_END, 0, NULL },
+ { MHD_OPTION_END, 0, NULL },
+ { MHD_OPTION_END, 0, NULL },
+ { MHD_OPTION_END, 0, NULL },
+ { MHD_OPTION_END, 0, NULL },
+ };
+ int opts_pos = 2;
+
+ /* We force MHD_USE_ITC here, in order to make sure
+ * libmicrohttpd doesn't use shutdown() on our listening
+ * socket, which would break socket re-activation. See
+ *
+ * https://lists.gnu.org/archive/html/libmicrohttpd/2015-09/msg00014.html
+ * https://github.com/systemd/systemd/pull/1286
+ */
+
+ int flags =
+ MHD_USE_DEBUG |
+ MHD_USE_DUAL_STACK |
+ MHD_USE_ITC |
+ MHD_USE_POLL_INTERNAL_THREAD |
+ MHD_USE_THREAD_PER_CONNECTION;
+ int r, n;
+
+ log_setup_service();
+
+ r = parse_argv(argc, argv);
+ if (r <= 0)
+ return r;
+
+ sigbus_install();
+
+ r = setup_gnutls_logger(NULL);
+ if (r < 0)
+ return r;
+
+ n = sd_listen_fds(1);
+ if (n < 0)
+ return log_error_errno(n, "Failed to determine passed sockets: %m");
+ if (n > 1)
+ return log_error_errno(SYNTHETIC_ERRNO(EINVAL), "Can't listen on more than one socket.");
+
+ if (n == 1)
+ opts[opts_pos++] = (struct MHD_OptionItem)
+ { MHD_OPTION_LISTEN_SOCKET, SD_LISTEN_FDS_START };
+
+ if (arg_key_pem) {
+ assert(arg_cert_pem);
+ opts[opts_pos++] = (struct MHD_OptionItem)
+ { MHD_OPTION_HTTPS_MEM_KEY, 0, arg_key_pem };
+ opts[opts_pos++] = (struct MHD_OptionItem)
+ { MHD_OPTION_HTTPS_MEM_CERT, 0, arg_cert_pem };
+ flags |= MHD_USE_TLS;
+ }
+
+ if (arg_trust_pem) {
+ assert(flags & MHD_USE_TLS);
+ opts[opts_pos++] = (struct MHD_OptionItem)
+ { MHD_OPTION_HTTPS_MEM_TRUST, 0, arg_trust_pem };
+ }
+
+ d = MHD_start_daemon(flags, 19531,
+ NULL, NULL,
+ request_handler, NULL,
+ MHD_OPTION_ARRAY, opts,
+ MHD_OPTION_END);
+ if (!d)
+ return log_error_errno(SYNTHETIC_ERRNO(EINVAL), "Failed to start daemon!");
+
+ pause();
+
+ return 0;
+}
+
+DEFINE_MAIN_FUNCTION(run);
diff --git a/src/journal-remote/journal-remote-main.c b/src/journal-remote/journal-remote-main.c
new file mode 100644
index 0000000..802c3ea
--- /dev/null
+++ b/src/journal-remote/journal-remote-main.c
@@ -0,0 +1,1158 @@
+/* SPDX-License-Identifier: LGPL-2.1+ */
+
+#include <getopt.h>
+#include <unistd.h>
+
+#include "sd-daemon.h"
+
+#include "conf-parser.h"
+#include "daemon-util.h"
+#include "def.h"
+#include "fd-util.h"
+#include "fileio.h"
+#include "journal-remote-write.h"
+#include "journal-remote.h"
+#include "main-func.h"
+#include "pretty-print.h"
+#include "process-util.h"
+#include "rlimit-util.h"
+#include "signal-util.h"
+#include "socket-util.h"
+#include "stat-util.h"
+#include "string-table.h"
+#include "strv.h"
+
+#define PRIV_KEY_FILE CERTIFICATE_ROOT "/private/journal-remote.pem"
+#define CERT_FILE CERTIFICATE_ROOT "/certs/journal-remote.pem"
+#define TRUST_FILE CERTIFICATE_ROOT "/ca/trusted.pem"
+
+static const char* arg_url = NULL;
+static const char* arg_getter = NULL;
+static const char* arg_listen_raw = NULL;
+static const char* arg_listen_http = NULL;
+static const char* arg_listen_https = NULL;
+static char** arg_files = NULL; /* Do not free this. */
+static int arg_compress = true;
+static int arg_seal = false;
+static int http_socket = -1, https_socket = -1;
+static char** arg_gnutls_log = NULL;
+
+static JournalWriteSplitMode arg_split_mode = _JOURNAL_WRITE_SPLIT_INVALID;
+static const char* arg_output = NULL;
+
+static char *arg_key = NULL;
+static char *arg_cert = NULL;
+static char *arg_trust = NULL;
+static bool arg_trust_all = false;
+
+STATIC_DESTRUCTOR_REGISTER(arg_gnutls_log, strv_freep);
+STATIC_DESTRUCTOR_REGISTER(arg_key, freep);
+STATIC_DESTRUCTOR_REGISTER(arg_cert, freep);
+STATIC_DESTRUCTOR_REGISTER(arg_trust, freep);
+
+static const char* const journal_write_split_mode_table[_JOURNAL_WRITE_SPLIT_MAX] = {
+ [JOURNAL_WRITE_SPLIT_NONE] = "none",
+ [JOURNAL_WRITE_SPLIT_HOST] = "host",
+};
+
+DEFINE_PRIVATE_STRING_TABLE_LOOKUP(journal_write_split_mode, JournalWriteSplitMode);
+static DEFINE_CONFIG_PARSE_ENUM(config_parse_write_split_mode,
+ journal_write_split_mode,
+ JournalWriteSplitMode,
+ "Failed to parse split mode setting");
+
+/**********************************************************************
+ **********************************************************************
+ **********************************************************************/
+
+static int spawn_child(const char* child, char** argv) {
+ pid_t child_pid;
+ int fd[2], r;
+
+ if (pipe(fd) < 0)
+ return log_error_errno(errno, "Failed to create pager pipe: %m");
+
+ r = safe_fork("(remote)", FORK_RESET_SIGNALS|FORK_DEATHSIG|FORK_LOG, &child_pid);
+ if (r < 0) {
+ safe_close_pair(fd);
+ return r;
+ }
+
+ /* In the child */
+ if (r == 0) {
+ safe_close(fd[0]);
+
+ r = rearrange_stdio(STDIN_FILENO, fd[1], STDERR_FILENO);
+ if (r < 0) {
+ log_error_errno(r, "Failed to dup pipe to stdout: %m");
+ _exit(EXIT_FAILURE);
+ }
+
+ (void) rlimit_nofile_safe();
+
+ execvp(child, argv);
+ log_error_errno(errno, "Failed to exec child %s: %m", child);
+ _exit(EXIT_FAILURE);
+ }
+
+ safe_close(fd[1]);
+
+ r = fd_nonblock(fd[0], true);
+ if (r < 0)
+ log_warning_errno(errno, "Failed to set child pipe to non-blocking: %m");
+
+ return fd[0];
+}
+
+static int spawn_curl(const char* url) {
+ char **argv = STRV_MAKE("curl",
+ "-HAccept: application/vnd.fdo.journal",
+ "--silent",
+ "--show-error",
+ url);
+ int r;
+
+ r = spawn_child("curl", argv);
+ if (r < 0)
+ log_error_errno(r, "Failed to spawn curl: %m");
+ return r;
+}
+
+static int spawn_getter(const char *getter) {
+ int r;
+ _cleanup_strv_free_ char **words = NULL;
+
+ assert(getter);
+ r = strv_split_extract(&words, getter, WHITESPACE, EXTRACT_QUOTES);
+ if (r < 0)
+ return log_error_errno(r, "Failed to split getter option: %m");
+
+ r = spawn_child(words[0], words);
+ if (r < 0)
+ log_error_errno(r, "Failed to spawn getter %s: %m", getter);
+
+ return r;
+}
+
+/**********************************************************************
+ **********************************************************************
+ **********************************************************************/
+
+static int null_timer_event_handler(sd_event_source *s,
+ uint64_t usec,
+ void *userdata);
+static int dispatch_http_event(sd_event_source *event,
+ int fd,
+ uint32_t revents,
+ void *userdata);
+
+static int request_meta(void **connection_cls, int fd, char *hostname) {
+ RemoteSource *source;
+ Writer *writer;
+ int r;
+
+ assert(connection_cls);
+ if (*connection_cls)
+ return 0;
+
+ r = journal_remote_get_writer(journal_remote_server_global, hostname, &writer);
+ if (r < 0)
+ return log_warning_errno(r, "Failed to get writer for source %s: %m",
+ hostname);
+
+ source = source_new(fd, true, hostname, writer);
+ if (!source) {
+ writer_unref(writer);
+ return log_oom();
+ }
+
+ log_debug("Added RemoteSource as connection metadata %p", source);
+
+ *connection_cls = source;
+ return 0;
+}
+
+static void request_meta_free(void *cls,
+ struct MHD_Connection *connection,
+ void **connection_cls,
+ enum MHD_RequestTerminationCode toe) {
+ RemoteSource *s;
+
+ assert(connection_cls);
+ s = *connection_cls;
+
+ if (s) {
+ log_debug("Cleaning up connection metadata %p", s);
+ source_free(s);
+ *connection_cls = NULL;
+ }
+}
+
+static int process_http_upload(
+ struct MHD_Connection *connection,
+ const char *upload_data,
+ size_t *upload_data_size,
+ RemoteSource *source) {
+
+ bool finished = false;
+ size_t remaining;
+ int r;
+
+ assert(source);
+
+ log_trace("%s: connection %p, %zu bytes",
+ __func__, connection, *upload_data_size);
+
+ if (*upload_data_size) {
+ log_trace("Received %zu bytes", *upload_data_size);
+
+ r = journal_importer_push_data(&source->importer,
+ upload_data, *upload_data_size);
+ if (r < 0)
+ return mhd_respond_oom(connection);
+
+ *upload_data_size = 0;
+ } else
+ finished = true;
+
+ for (;;) {
+ r = process_source(source,
+ journal_remote_server_global->compress,
+ journal_remote_server_global->seal);
+ if (r == -EAGAIN)
+ break;
+ if (r < 0) {
+ if (r == -ENOBUFS)
+ log_warning_errno(r, "Entry is above the maximum of %u, aborting connection %p.",
+ DATA_SIZE_MAX, connection);
+ else if (r == -E2BIG)
+ log_warning_errno(r, "Entry with more fields than the maximum of %u, aborting connection %p.",
+ ENTRY_FIELD_COUNT_MAX, connection);
+ else
+ log_warning_errno(r, "Failed to process data, aborting connection %p: %m",
+ connection);
+ return MHD_NO;
+ }
+ }
+
+ if (!finished)
+ return MHD_YES;
+
+ /* The upload is finished */
+
+ remaining = journal_importer_bytes_remaining(&source->importer);
+ if (remaining > 0) {
+ log_warning("Premature EOF byte. %zu bytes lost.", remaining);
+ return mhd_respondf(connection,
+ 0, MHD_HTTP_EXPECTATION_FAILED,
+ "Premature EOF. %zu bytes of trailing data not processed.",
+ remaining);
+ }
+
+ return mhd_respond(connection, MHD_HTTP_ACCEPTED, "OK.");
+};
+
+static int request_handler(
+ void *cls,
+ struct MHD_Connection *connection,
+ const char *url,
+ const char *method,
+ const char *version,
+ const char *upload_data,
+ size_t *upload_data_size,
+ void **connection_cls) {
+
+ const char *header;
+ int r, code, fd;
+ _cleanup_free_ char *hostname = NULL;
+ size_t len;
+
+ assert(connection);
+ assert(connection_cls);
+ assert(url);
+ assert(method);
+
+ log_trace("Handling a connection %s %s %s", method, url, version);
+
+ if (*connection_cls)
+ return process_http_upload(connection,
+ upload_data, upload_data_size,
+ *connection_cls);
+
+ if (!streq(method, "POST"))
+ return mhd_respond(connection, MHD_HTTP_NOT_ACCEPTABLE, "Unsupported method.");
+
+ if (!streq(url, "/upload"))
+ return mhd_respond(connection, MHD_HTTP_NOT_FOUND, "Not found.");
+
+ header = MHD_lookup_connection_value(connection, MHD_HEADER_KIND, "Content-Type");
+ if (!header || !streq(header, "application/vnd.fdo.journal"))
+ return mhd_respond(connection, MHD_HTTP_UNSUPPORTED_MEDIA_TYPE,
+ "Content-Type: application/vnd.fdo.journal is required.");
+
+ header = MHD_lookup_connection_value(connection, MHD_HEADER_KIND, "Content-Length");
+ if (!header)
+ return mhd_respond(connection, MHD_HTTP_LENGTH_REQUIRED,
+ "Content-Length header is required.");
+ r = safe_atozu(header, &len);
+ if (r < 0)
+ return mhd_respondf(connection, r, MHD_HTTP_LENGTH_REQUIRED,
+ "Content-Length: %s cannot be parsed: %m", header);
+
+ if (len > ENTRY_SIZE_MAX)
+ /* When serialized, an entry of maximum size might be slightly larger,
+ * so this does not correspond exactly to the limit in journald. Oh well.
+ */
+ return mhd_respondf(connection, 0, MHD_HTTP_PAYLOAD_TOO_LARGE,
+ "Payload larger than maximum size of %u bytes", ENTRY_SIZE_MAX);
+
+ {
+ const union MHD_ConnectionInfo *ci;
+
+ ci = MHD_get_connection_info(connection,
+ MHD_CONNECTION_INFO_CONNECTION_FD);
+ if (!ci) {
+ log_error("MHD_get_connection_info failed: cannot get remote fd");
+ return mhd_respond(connection, MHD_HTTP_INTERNAL_SERVER_ERROR,
+ "Cannot check remote address.");
+ }
+
+ fd = ci->connect_fd;
+ assert(fd >= 0);
+ }
+
+ if (journal_remote_server_global->check_trust) {
+ r = check_permissions(connection, &code, &hostname);
+ if (r < 0)
+ return code;
+ } else {
+ r = getpeername_pretty(fd, false, &hostname);
+ if (r < 0)
+ return mhd_respond(connection, MHD_HTTP_INTERNAL_SERVER_ERROR,
+ "Cannot check remote hostname.");
+ }
+
+ assert(hostname);
+
+ r = request_meta(connection_cls, fd, hostname);
+ if (r == -ENOMEM)
+ return respond_oom(connection);
+ else if (r < 0)
+ return mhd_respondf(connection, r, MHD_HTTP_INTERNAL_SERVER_ERROR, "%m");
+
+ hostname = NULL;
+ return MHD_YES;
+}
+
+static int setup_microhttpd_server(RemoteServer *s,
+ int fd,
+ const char *key,
+ const char *cert,
+ const char *trust) {
+ struct MHD_OptionItem opts[] = {
+ { MHD_OPTION_NOTIFY_COMPLETED, (intptr_t) request_meta_free},
+ { MHD_OPTION_EXTERNAL_LOGGER, (intptr_t) microhttpd_logger},
+ { MHD_OPTION_LISTEN_SOCKET, fd},
+ { MHD_OPTION_CONNECTION_MEMORY_LIMIT, 128*1024},
+ { MHD_OPTION_END},
+ { MHD_OPTION_END},
+ { MHD_OPTION_END},
+ { MHD_OPTION_END},
+ { MHD_OPTION_END}};
+ int opts_pos = 4;
+ int flags =
+ MHD_USE_DEBUG |
+ MHD_USE_DUAL_STACK |
+ MHD_USE_EPOLL |
+ MHD_USE_ITC;
+
+ const union MHD_DaemonInfo *info;
+ int r, epoll_fd;
+ MHDDaemonWrapper *d;
+
+ assert(fd >= 0);
+
+ r = fd_nonblock(fd, true);
+ if (r < 0)
+ return log_error_errno(r, "Failed to make fd:%d nonblocking: %m", fd);
+
+/* MHD_OPTION_STRICT_FOR_CLIENT is introduced in microhttpd 0.9.54,
+ * and MHD_USE_PEDANTIC_CHECKS will be deprecated in future.
+ * If MHD_USE_PEDANTIC_CHECKS is '#define'd, then it is deprecated
+ * and we should use MHD_OPTION_STRICT_FOR_CLIENT. On the other hand,
+ * if MHD_USE_PEDANTIC_CHECKS is not '#define'd, then it is not
+ * deprecated yet and there exists an enum element with the same name.
+ * So we can safely use it. */
+#ifdef MHD_USE_PEDANTIC_CHECKS
+ opts[opts_pos++] = (struct MHD_OptionItem)
+ {MHD_OPTION_STRICT_FOR_CLIENT, 1};
+#else
+ flags |= MHD_USE_PEDANTIC_CHECKS;
+#endif
+
+ if (key) {
+ assert(cert);
+
+ opts[opts_pos++] = (struct MHD_OptionItem)
+ {MHD_OPTION_HTTPS_MEM_KEY, 0, (char*) key};
+ opts[opts_pos++] = (struct MHD_OptionItem)
+ {MHD_OPTION_HTTPS_MEM_CERT, 0, (char*) cert};
+
+ flags |= MHD_USE_TLS;
+
+ if (trust)
+ opts[opts_pos++] = (struct MHD_OptionItem)
+ {MHD_OPTION_HTTPS_MEM_TRUST, 0, (char*) trust};
+ }
+
+ d = new(MHDDaemonWrapper, 1);
+ if (!d)
+ return log_oom();
+
+ d->fd = (uint64_t) fd;
+
+ d->daemon = MHD_start_daemon(flags, 0,
+ NULL, NULL,
+ request_handler, NULL,
+ MHD_OPTION_ARRAY, opts,
+ MHD_OPTION_END);
+ if (!d->daemon) {
+ log_error("Failed to start µhttp daemon");
+ r = -EINVAL;
+ goto error;
+ }
+
+ log_debug("Started MHD %s daemon on fd:%d (wrapper @ %p)",
+ key ? "HTTPS" : "HTTP", fd, d);
+
+ info = MHD_get_daemon_info(d->daemon, MHD_DAEMON_INFO_EPOLL_FD_LINUX_ONLY);
+ if (!info) {
+ log_error("µhttp returned NULL daemon info");
+ r = -EOPNOTSUPP;
+ goto error;
+ }
+
+ epoll_fd = info->listen_fd;
+ if (epoll_fd < 0) {
+ log_error("µhttp epoll fd is invalid");
+ r = -EUCLEAN;
+ goto error;
+ }
+
+ r = sd_event_add_io(s->events, &d->io_event,
+ epoll_fd, EPOLLIN,
+ dispatch_http_event, d);
+ if (r < 0) {
+ log_error_errno(r, "Failed to add event callback: %m");
+ goto error;
+ }
+
+ r = sd_event_source_set_description(d->io_event, "io_event");
+ if (r < 0) {
+ log_error_errno(r, "Failed to set source name: %m");
+ goto error;
+ }
+
+ r = sd_event_add_time(s->events, &d->timer_event,
+ CLOCK_MONOTONIC, (uint64_t) -1, 0,
+ null_timer_event_handler, d);
+ if (r < 0) {
+ log_error_errno(r, "Failed to add timer_event: %m");
+ goto error;
+ }
+
+ r = sd_event_source_set_description(d->timer_event, "timer_event");
+ if (r < 0) {
+ log_error_errno(r, "Failed to set source name: %m");
+ goto error;
+ }
+
+ r = hashmap_ensure_allocated(&s->daemons, &uint64_hash_ops);
+ if (r < 0) {
+ log_oom();
+ goto error;
+ }
+
+ r = hashmap_put(s->daemons, &d->fd, d);
+ if (r < 0) {
+ log_error_errno(r, "Failed to add daemon to hashmap: %m");
+ goto error;
+ }
+
+ s->active++;
+ return 0;
+
+error:
+ MHD_stop_daemon(d->daemon);
+ free(d->daemon);
+ free(d);
+ return r;
+}
+
+static int setup_microhttpd_socket(RemoteServer *s,
+ const char *address,
+ const char *key,
+ const char *cert,
+ const char *trust) {
+ int fd;
+
+ fd = make_socket_fd(LOG_DEBUG, address, SOCK_STREAM, SOCK_CLOEXEC);
+ if (fd < 0)
+ return fd;
+
+ return setup_microhttpd_server(s, fd, key, cert, trust);
+}
+
+static int null_timer_event_handler(sd_event_source *timer_event,
+ uint64_t usec,
+ void *userdata) {
+ return dispatch_http_event(timer_event, 0, 0, userdata);
+}
+
+static int dispatch_http_event(sd_event_source *event,
+ int fd,
+ uint32_t revents,
+ void *userdata) {
+ MHDDaemonWrapper *d = userdata;
+ int r;
+ MHD_UNSIGNED_LONG_LONG timeout = ULONG_LONG_MAX;
+
+ assert(d);
+
+ r = MHD_run(d->daemon);
+ if (r == MHD_NO)
+ // FIXME: unregister daemon
+ return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
+ "MHD_run failed!");
+ if (MHD_get_timeout(d->daemon, &timeout) == MHD_NO)
+ timeout = ULONG_LONG_MAX;
+
+ r = sd_event_source_set_time(d->timer_event, timeout);
+ if (r < 0) {
+ log_warning_errno(r, "Unable to set event loop timeout: %m, this may result in indefinite blocking!");
+ return 1;
+ }
+
+ r = sd_event_source_set_enabled(d->timer_event, SD_EVENT_ON);
+ if (r < 0)
+ log_warning_errno(r, "Unable to enable timer_event: %m, this may result in indefinite blocking!");
+
+ return 1; /* work to do */
+}
+
+/**********************************************************************
+ **********************************************************************
+ **********************************************************************/
+
+static int setup_signals(RemoteServer *s) {
+ int r;
+
+ assert(s);
+
+ assert_se(sigprocmask_many(SIG_SETMASK, NULL, SIGINT, SIGTERM, -1) >= 0);
+
+ r = sd_event_add_signal(s->events, &s->sigterm_event, SIGTERM, NULL, s);
+ if (r < 0)
+ return r;
+
+ r = sd_event_add_signal(s->events, &s->sigint_event, SIGINT, NULL, s);
+ if (r < 0)
+ return r;
+
+ return 0;
+}
+
+static int setup_raw_socket(RemoteServer *s, const char *address) {
+ int fd;
+
+ fd = make_socket_fd(LOG_INFO, address, SOCK_STREAM, SOCK_CLOEXEC);
+ if (fd < 0)
+ return fd;
+
+ return journal_remote_add_raw_socket(s, fd);
+}
+
+static int create_remoteserver(
+ RemoteServer *s,
+ const char* key,
+ const char* cert,
+ const char* trust) {
+
+ int r, n, fd;
+ char **file;
+
+ r = journal_remote_server_init(s, arg_output, arg_split_mode, arg_compress, arg_seal);
+ if (r < 0)
+ return r;
+
+ r = setup_signals(s);
+ if (r < 0)
+ return log_error_errno(r, "Failed to set up signals: %m");
+
+ n = sd_listen_fds(true);
+ if (n < 0)
+ return log_error_errno(n, "Failed to read listening file descriptors from environment: %m");
+ else
+ log_debug("Received %d descriptors", n);
+
+ if (MAX(http_socket, https_socket) >= SD_LISTEN_FDS_START + n)
+ return log_error_errno(SYNTHETIC_ERRNO(EBADFD),
+ "Received fewer sockets than expected");
+
+ for (fd = SD_LISTEN_FDS_START; fd < SD_LISTEN_FDS_START + n; fd++) {
+ if (sd_is_socket(fd, AF_UNSPEC, 0, true)) {
+ log_debug("Received a listening socket (fd:%d)", fd);
+
+ if (fd == http_socket)
+ r = setup_microhttpd_server(s, fd, NULL, NULL, NULL);
+ else if (fd == https_socket)
+ r = setup_microhttpd_server(s, fd, key, cert, trust);
+ else
+ r = journal_remote_add_raw_socket(s, fd);
+ } else if (sd_is_socket(fd, AF_UNSPEC, 0, false)) {
+ char *hostname;
+
+ r = getpeername_pretty(fd, false, &hostname);
+ if (r < 0)
+ return log_error_errno(r, "Failed to retrieve remote name: %m");
+
+ log_debug("Received a connection socket (fd:%d) from %s", fd, hostname);
+
+ r = journal_remote_add_source(s, fd, hostname, true);
+ } else
+ return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
+ "Unknown socket passed on fd:%d", fd);
+
+ if (r < 0)
+ return log_error_errno(r, "Failed to register socket (fd:%d): %m", fd);
+ }
+
+ if (arg_getter) {
+ log_info("Spawning getter %s...", arg_getter);
+ fd = spawn_getter(arg_getter);
+ if (fd < 0)
+ return fd;
+
+ r = journal_remote_add_source(s, fd, (char*) arg_output, false);
+ if (r < 0)
+ return r;
+ }
+
+ if (arg_url) {
+ const char *url, *hostname;
+
+ if (!strstr(arg_url, "/entries")) {
+ if (endswith(arg_url, "/"))
+ url = strjoina(arg_url, "entries");
+ else
+ url = strjoina(arg_url, "/entries");
+ } else
+ url = strdupa(arg_url);
+
+ log_info("Spawning curl %s...", url);
+ fd = spawn_curl(url);
+ if (fd < 0)
+ return fd;
+
+ hostname = STARTSWITH_SET(arg_url, "https://", "http://");
+ if (!hostname)
+ hostname = arg_url;
+
+ hostname = strndupa(hostname, strcspn(hostname, "/:"));
+
+ r = journal_remote_add_source(s, fd, (char *) hostname, false);
+ if (r < 0)
+ return r;
+ }
+
+ if (arg_listen_raw) {
+ log_debug("Listening on a socket...");
+ r = setup_raw_socket(s, arg_listen_raw);
+ if (r < 0)
+ return r;
+ }
+
+ if (arg_listen_http) {
+ r = setup_microhttpd_socket(s, arg_listen_http, NULL, NULL, NULL);
+ if (r < 0)
+ return r;
+ }
+
+ if (arg_listen_https) {
+ r = setup_microhttpd_socket(s, arg_listen_https, key, cert, trust);
+ if (r < 0)
+ return r;
+ }
+
+ STRV_FOREACH(file, arg_files) {
+ const char *output_name;
+
+ if (streq(*file, "-")) {
+ log_debug("Using standard input as source.");
+
+ fd = STDIN_FILENO;
+ output_name = "stdin";
+ } else {
+ log_debug("Reading file %s...", *file);
+
+ fd = open(*file, O_RDONLY|O_CLOEXEC|O_NOCTTY|O_NONBLOCK);
+ if (fd < 0)
+ return log_error_errno(errno, "Failed to open %s: %m", *file);
+ output_name = *file;
+ }
+
+ r = journal_remote_add_source(s, fd, (char*) output_name, false);
+ if (r < 0)
+ return r;
+ }
+
+ if (s->active == 0)
+ return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
+ "Zero sources specified");
+
+ if (arg_split_mode == JOURNAL_WRITE_SPLIT_NONE) {
+ /* In this case we know what the writer will be
+ called, so we can create it and verify that we can
+ create output as expected. */
+ r = journal_remote_get_writer(s, NULL, &s->_single_writer);
+ if (r < 0)
+ return r;
+ }
+
+ return 0;
+}
+
+static int negative_fd(const char *spec) {
+ /* Return a non-positive number as its inverse, -EINVAL otherwise. */
+
+ int fd, r;
+
+ r = safe_atoi(spec, &fd);
+ if (r < 0)
+ return r;
+
+ if (fd > 0)
+ return -EINVAL;
+ else
+ return -fd;
+}
+
+static int parse_config(void) {
+ const ConfigTableItem items[] = {
+ { "Remote", "Seal", config_parse_bool, 0, &arg_seal },
+ { "Remote", "SplitMode", config_parse_write_split_mode, 0, &arg_split_mode },
+ { "Remote", "ServerKeyFile", config_parse_path, 0, &arg_key },
+ { "Remote", "ServerCertificateFile", config_parse_path, 0, &arg_cert },
+ { "Remote", "TrustedCertificateFile", config_parse_path, 0, &arg_trust },
+ {}
+ };
+
+ return config_parse_many_nulstr(PKGSYSCONFDIR "/journal-remote.conf",
+ CONF_PATHS_NULSTR("systemd/journal-remote.conf.d"),
+ "Remote\0", config_item_table_lookup, items,
+ CONFIG_PARSE_WARN, NULL);
+}
+
+static int help(void) {
+ _cleanup_free_ char *link = NULL;
+ int r;
+
+ r = terminal_urlify_man("systemd-journal-remote.service", "8", &link);
+ if (r < 0)
+ return log_oom();
+
+ printf("%s [OPTIONS...] {FILE|-}...\n\n"
+ "Write external journal events to journal file(s).\n\n"
+ " -h --help Show this help\n"
+ " --version Show package version\n"
+ " --url=URL Read events from systemd-journal-gatewayd at URL\n"
+ " --getter=COMMAND Read events from the output of COMMAND\n"
+ " --listen-raw=ADDR Listen for connections at ADDR\n"
+ " --listen-http=ADDR Listen for HTTP connections at ADDR\n"
+ " --listen-https=ADDR Listen for HTTPS connections at ADDR\n"
+ " -o --output=FILE|DIR Write output to FILE or DIR/external-*.journal\n"
+ " --compress[=BOOL] XZ-compress the output journal (default: yes)\n"
+ " --seal[=BOOL] Use event sealing (default: no)\n"
+ " --key=FILENAME SSL key in PEM format (default:\n"
+ " \"" PRIV_KEY_FILE "\")\n"
+ " --cert=FILENAME SSL certificate in PEM format (default:\n"
+ " \"" CERT_FILE "\")\n"
+ " --trust=FILENAME|all SSL CA certificate or disable checking (default:\n"
+ " \"" TRUST_FILE "\")\n"
+ " --gnutls-log=CATEGORY...\n"
+ " Specify a list of gnutls logging categories\n"
+ " --split-mode=none|host How many output files to create\n"
+ "\nNote: file descriptors from sd_listen_fds() will be consumed, too.\n"
+ "\nSee the %s for details.\n"
+ , program_invocation_short_name
+ , link
+ );
+
+ return 0;
+}
+
+static int parse_argv(int argc, char *argv[]) {
+ enum {
+ ARG_VERSION = 0x100,
+ ARG_URL,
+ ARG_LISTEN_RAW,
+ ARG_LISTEN_HTTP,
+ ARG_LISTEN_HTTPS,
+ ARG_GETTER,
+ ARG_SPLIT_MODE,
+ ARG_COMPRESS,
+ ARG_SEAL,
+ ARG_KEY,
+ ARG_CERT,
+ ARG_TRUST,
+ ARG_GNUTLS_LOG,
+ };
+
+ static const struct option options[] = {
+ { "help", no_argument, NULL, 'h' },
+ { "version", no_argument, NULL, ARG_VERSION },
+ { "url", required_argument, NULL, ARG_URL },
+ { "getter", required_argument, NULL, ARG_GETTER },
+ { "listen-raw", required_argument, NULL, ARG_LISTEN_RAW },
+ { "listen-http", required_argument, NULL, ARG_LISTEN_HTTP },
+ { "listen-https", required_argument, NULL, ARG_LISTEN_HTTPS },
+ { "output", required_argument, NULL, 'o' },
+ { "split-mode", required_argument, NULL, ARG_SPLIT_MODE },
+ { "compress", optional_argument, NULL, ARG_COMPRESS },
+ { "seal", optional_argument, NULL, ARG_SEAL },
+ { "key", required_argument, NULL, ARG_KEY },
+ { "cert", required_argument, NULL, ARG_CERT },
+ { "trust", required_argument, NULL, ARG_TRUST },
+ { "gnutls-log", required_argument, NULL, ARG_GNUTLS_LOG },
+ {}
+ };
+
+ int c, r;
+ bool type_a, type_b;
+
+ assert(argc >= 0);
+ assert(argv);
+
+ while ((c = getopt_long(argc, argv, "ho:", options, NULL)) >= 0)
+ switch(c) {
+
+ case 'h':
+ return help();
+
+ case ARG_VERSION:
+ return version();
+
+ case ARG_URL:
+ if (arg_url)
+ return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
+ "cannot currently set more than one --url");
+
+ arg_url = optarg;
+ break;
+
+ case ARG_GETTER:
+ if (arg_getter)
+ return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
+ "cannot currently use --getter more than once");
+
+ arg_getter = optarg;
+ break;
+
+ case ARG_LISTEN_RAW:
+ if (arg_listen_raw)
+ return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
+ "cannot currently use --listen-raw more than once");
+
+ arg_listen_raw = optarg;
+ break;
+
+ case ARG_LISTEN_HTTP:
+ if (arg_listen_http || http_socket >= 0)
+ return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
+ "cannot currently use --listen-http more than once");
+
+ r = negative_fd(optarg);
+ if (r >= 0)
+ http_socket = r;
+ else
+ arg_listen_http = optarg;
+ break;
+
+ case ARG_LISTEN_HTTPS:
+ if (arg_listen_https || https_socket >= 0)
+ return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
+ "cannot currently use --listen-https more than once");
+
+ r = negative_fd(optarg);
+ if (r >= 0)
+ https_socket = r;
+ else
+ arg_listen_https = optarg;
+
+ break;
+
+ case ARG_KEY:
+ if (arg_key)
+ return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
+ "Key file specified twice");
+
+ arg_key = strdup(optarg);
+ if (!arg_key)
+ return log_oom();
+
+ break;
+
+ case ARG_CERT:
+ if (arg_cert)
+ return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
+ "Certificate file specified twice");
+
+ arg_cert = strdup(optarg);
+ if (!arg_cert)
+ return log_oom();
+
+ break;
+
+ case ARG_TRUST:
+ if (arg_trust || arg_trust_all)
+ return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
+ "Confusing trusted CA configuration");
+
+ if (streq(optarg, "all"))
+ arg_trust_all = true;
+ else {
+#if HAVE_GNUTLS
+ arg_trust = strdup(optarg);
+ if (!arg_trust)
+ return log_oom();
+#else
+ return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
+ "Option --trust is not available.");
+#endif
+ }
+
+ break;
+
+ case 'o':
+ if (arg_output)
+ return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
+ "cannot use --output/-o more than once");
+
+ arg_output = optarg;
+ break;
+
+ case ARG_SPLIT_MODE:
+ arg_split_mode = journal_write_split_mode_from_string(optarg);
+ if (arg_split_mode == _JOURNAL_WRITE_SPLIT_INVALID)
+ return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
+ "Invalid split mode: %s", optarg);
+ break;
+
+ case ARG_COMPRESS:
+ if (optarg) {
+ r = parse_boolean(optarg);
+ if (r < 0)
+ return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
+ "Failed to parse --compress= parameter.");
+
+ arg_compress = !!r;
+ } else
+ arg_compress = true;
+
+ break;
+
+ case ARG_SEAL:
+ if (optarg) {
+ r = parse_boolean(optarg);
+ if (r < 0)
+ return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
+ "Failed to parse --seal= parameter.");
+
+ arg_seal = !!r;
+ } else
+ arg_seal = true;
+
+ break;
+
+ case ARG_GNUTLS_LOG: {
+#if HAVE_GNUTLS
+ const char* p = optarg;
+ for (;;) {
+ _cleanup_free_ char *word = NULL;
+
+ r = extract_first_word(&p, &word, ",", 0);
+ if (r < 0)
+ return log_error_errno(r, "Failed to parse --gnutls-log= argument: %m");
+ if (r == 0)
+ break;
+
+ if (strv_push(&arg_gnutls_log, word) < 0)
+ return log_oom();
+
+ word = NULL;
+ }
+ break;
+#else
+ return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
+ "Option --gnutls-log is not available.");
+#endif
+ }
+
+ case '?':
+ return -EINVAL;
+
+ default:
+ assert_not_reached("Unknown option code.");
+ }
+
+ if (optind < argc)
+ arg_files = argv + optind;
+
+ type_a = arg_getter || !strv_isempty(arg_files);
+ type_b = arg_url
+ || arg_listen_raw
+ || arg_listen_http || arg_listen_https
+ || sd_listen_fds(false) > 0;
+ if (type_a && type_b)
+ return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
+ "Cannot use file input or --getter with "
+ "--arg-listen-... or socket activation.");
+ if (type_a) {
+ if (!arg_output)
+ return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
+ "Option --output must be specified with file input or --getter.");
+
+ if (!IN_SET(arg_split_mode, JOURNAL_WRITE_SPLIT_NONE, _JOURNAL_WRITE_SPLIT_INVALID))
+ return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
+ "For active sources, only --split-mode=none is allowed.");
+
+ arg_split_mode = JOURNAL_WRITE_SPLIT_NONE;
+ }
+
+ if (arg_split_mode == _JOURNAL_WRITE_SPLIT_INVALID)
+ arg_split_mode = JOURNAL_WRITE_SPLIT_HOST;
+
+ if (arg_split_mode == JOURNAL_WRITE_SPLIT_NONE && arg_output) {
+ if (is_dir(arg_output, true) > 0)
+ return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
+ "For SplitMode=none, output must be a file.");
+ if (!endswith(arg_output, ".journal"))
+ return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
+ "For SplitMode=none, output file name must end with .journal.");
+ }
+
+ if (arg_split_mode == JOURNAL_WRITE_SPLIT_HOST
+ && arg_output && is_dir(arg_output, true) <= 0)
+ return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
+ "For SplitMode=host, output must be a directory.");
+
+ log_debug("Full config: SplitMode=%s Key=%s Cert=%s Trust=%s",
+ journal_write_split_mode_to_string(arg_split_mode),
+ strna(arg_key),
+ strna(arg_cert),
+ strna(arg_trust));
+
+ return 1 /* work to do */;
+}
+
+static int load_certificates(char **key, char **cert, char **trust) {
+ int r;
+
+ r = read_full_file(arg_key ?: PRIV_KEY_FILE, key, NULL);
+ if (r < 0)
+ return log_error_errno(r, "Failed to read key from file '%s': %m",
+ arg_key ?: PRIV_KEY_FILE);
+
+ r = read_full_file(arg_cert ?: CERT_FILE, cert, NULL);
+ if (r < 0)
+ return log_error_errno(r, "Failed to read certificate from file '%s': %m",
+ arg_cert ?: CERT_FILE);
+
+ if (arg_trust_all)
+ log_info("Certificate checking disabled.");
+ else {
+ r = read_full_file(arg_trust ?: TRUST_FILE, trust, NULL);
+ if (r < 0)
+ return log_error_errno(r, "Failed to read CA certificate file '%s': %m",
+ arg_trust ?: TRUST_FILE);
+ }
+
+ if ((arg_listen_raw || arg_listen_http) && *trust)
+ return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
+ "Option --trust makes all non-HTTPS connections untrusted.");
+
+ return 0;
+}
+
+static int run(int argc, char **argv) {
+ _cleanup_(notify_on_cleanup) const char *notify_message = NULL;
+ _cleanup_(journal_remote_server_destroy) RemoteServer s = {};
+ _cleanup_free_ char *key = NULL, *cert = NULL, *trust = NULL;
+ int r;
+
+ log_show_color(true);
+ log_parse_environment();
+
+ /* The journal merging logic potentially needs a lot of fds. */
+ (void) rlimit_nofile_bump(HIGH_RLIMIT_NOFILE);
+
+ r = parse_config();
+ if (r < 0)
+ return r;
+
+ r = parse_argv(argc, argv);
+ if (r <= 0)
+ return r;
+
+ if (arg_listen_http || arg_listen_https) {
+ r = setup_gnutls_logger(arg_gnutls_log);
+ if (r < 0)
+ return r;
+ }
+
+ if (arg_listen_https || https_socket >= 0) {
+ r = load_certificates(&key, &cert, &trust);
+ if (r < 0)
+ return r;
+
+ s.check_trust = !arg_trust_all;
+ }
+
+ r = create_remoteserver(&s, key, cert, trust);
+ if (r < 0)
+ return r;
+
+ r = sd_event_set_watchdog(s.events, true);
+ if (r < 0)
+ return log_error_errno(r, "Failed to enable watchdog: %m");
+
+ log_debug("Watchdog is %sd.", enable_disable(r > 0));
+
+ log_debug("%s running as pid "PID_FMT,
+ program_invocation_short_name, getpid_cached());
+
+ notify_message = notify_start(NOTIFY_READY, NOTIFY_STOPPING);
+
+ while (s.active) {
+ r = sd_event_get_state(s.events);
+ if (r < 0)
+ return r;
+ if (r == SD_EVENT_FINISHED)
+ break;
+
+ r = sd_event_run(s.events, -1);
+ if (r < 0)
+ return log_error_errno(r, "Failed to run event loop: %m");
+ }
+
+ notify_message = NULL;
+ (void) sd_notifyf(false,
+ "STOPPING=1\n"
+ "STATUS=Shutting down after writing %" PRIu64 " entries...", s.event_count);
+
+ log_info("Finishing after writing %" PRIu64 " entries", s.event_count);
+
+ return 0;
+}
+
+DEFINE_MAIN_FUNCTION(run);
diff --git a/src/journal-remote/journal-remote-parse.c b/src/journal-remote/journal-remote-parse.c
new file mode 100644
index 0000000..535d06a
--- /dev/null
+++ b/src/journal-remote/journal-remote-parse.c
@@ -0,0 +1,84 @@
+/* SPDX-License-Identifier: LGPL-2.1+ */
+
+#include "alloc-util.h"
+#include "fd-util.h"
+#include "journal-remote-parse.h"
+#include "journald-native.h"
+#include "parse-util.h"
+#include "string-util.h"
+
+void source_free(RemoteSource *source) {
+ if (!source)
+ return;
+
+ journal_importer_cleanup(&source->importer);
+
+ log_debug("Writer ref count %i", source->writer->n_ref);
+ writer_unref(source->writer);
+
+ sd_event_source_unref(source->event);
+ sd_event_source_unref(source->buffer_event);
+
+ free(source);
+}
+
+/**
+ * Initialize zero-filled source with given values. On success, takes
+ * ownership of fd, name, and writer, otherwise does not touch them.
+ */
+RemoteSource* source_new(int fd, bool passive_fd, char *name, Writer *writer) {
+ RemoteSource *source;
+
+ log_debug("Creating source for %sfd:%d (%s)",
+ passive_fd ? "passive " : "", fd, name);
+
+ assert(fd >= 0);
+
+ source = new0(RemoteSource, 1);
+ if (!source)
+ return NULL;
+
+ source->importer.fd = fd;
+ source->importer.passive_fd = passive_fd;
+ source->importer.name = name;
+
+ source->writer = writer;
+
+ return source;
+}
+
+int process_source(RemoteSource *source, bool compress, bool seal) {
+ int r;
+
+ assert(source);
+ assert(source->writer);
+
+ r = journal_importer_process_data(&source->importer);
+ if (r <= 0)
+ return r;
+
+ /* We have a full event */
+ log_trace("Received full event from source@%p fd:%d (%s)",
+ source, source->importer.fd, source->importer.name);
+
+ if (source->importer.iovw.count == 0) {
+ log_warning("Entry with no payload, skipping");
+ goto freeing;
+ }
+
+ assert(source->importer.iovw.iovec);
+
+ r = writer_write(source->writer, &source->importer.iovw, &source->importer.ts, compress, seal);
+ if (r == -EBADMSG) {
+ log_error_errno(r, "Entry is invalid, ignoring.");
+ r = 0;
+ } else if (r < 0)
+ log_error_errno(r, "Failed to write entry of %zu bytes: %m",
+ iovw_size(&source->importer.iovw));
+ else
+ r = 1;
+
+ freeing:
+ journal_importer_drop_iovw(&source->importer);
+ return r;
+}
diff --git a/src/journal-remote/journal-remote-parse.h b/src/journal-remote/journal-remote-parse.h
new file mode 100644
index 0000000..d6c7736
--- /dev/null
+++ b/src/journal-remote/journal-remote-parse.h
@@ -0,0 +1,20 @@
+/* SPDX-License-Identifier: LGPL-2.1+ */
+#pragma once
+
+#include "sd-event.h"
+
+#include "journal-importer.h"
+#include "journal-remote-write.h"
+
+typedef struct RemoteSource {
+ JournalImporter importer;
+
+ Writer *writer;
+
+ sd_event_source *event;
+ sd_event_source *buffer_event;
+} RemoteSource;
+
+RemoteSource* source_new(int fd, bool passive_fd, char *name, Writer *writer);
+void source_free(RemoteSource *source);
+int process_source(RemoteSource *source, bool compress, bool seal);
diff --git a/src/journal-remote/journal-remote-write.c b/src/journal-remote/journal-remote-write.c
new file mode 100644
index 0000000..188ff35
--- /dev/null
+++ b/src/journal-remote/journal-remote-write.c
@@ -0,0 +1,105 @@
+/* SPDX-License-Identifier: LGPL-2.1+ */
+
+#include "alloc-util.h"
+#include "journal-remote.h"
+
+static int do_rotate(JournalFile **f, bool compress, bool seal) {
+ int r = journal_file_rotate(f, compress, (uint64_t) -1, seal, NULL);
+ if (r < 0) {
+ if (*f)
+ log_error_errno(r, "Failed to rotate %s: %m", (*f)->path);
+ else
+ log_error_errno(r, "Failed to create rotated journal: %m");
+ }
+
+ return r;
+}
+
+Writer* writer_new(RemoteServer *server) {
+ Writer *w;
+
+ w = new0(Writer, 1);
+ if (!w)
+ return NULL;
+
+ memset(&w->metrics, 0xFF, sizeof(w->metrics));
+
+ w->mmap = mmap_cache_new();
+ if (!w->mmap)
+ return mfree(w);
+
+ w->n_ref = 1;
+ w->server = server;
+
+ return w;
+}
+
+static Writer* writer_free(Writer *w) {
+ if (!w)
+ return NULL;
+
+ if (w->journal) {
+ log_debug("Closing journal file %s.", w->journal->path);
+ journal_file_close(w->journal);
+ }
+
+ if (w->server && w->hashmap_key)
+ hashmap_remove(w->server->writers, w->hashmap_key);
+
+ free(w->hashmap_key);
+
+ if (w->mmap)
+ mmap_cache_unref(w->mmap);
+
+ return mfree(w);
+}
+
+DEFINE_TRIVIAL_REF_UNREF_FUNC(Writer, writer, writer_free);
+
+int writer_write(Writer *w,
+ struct iovec_wrapper *iovw,
+ dual_timestamp *ts,
+ bool compress,
+ bool seal) {
+ int r;
+
+ assert(w);
+ assert(iovw);
+ assert(iovw->count > 0);
+
+ if (journal_file_rotate_suggested(w->journal, 0)) {
+ log_info("%s: Journal header limits reached or header out-of-date, rotating",
+ w->journal->path);
+ r = do_rotate(&w->journal, compress, seal);
+ if (r < 0)
+ return r;
+ }
+
+ r = journal_file_append_entry(w->journal, ts, NULL,
+ iovw->iovec, iovw->count,
+ &w->seqnum, NULL, NULL);
+ if (r >= 0) {
+ if (w->server)
+ w->server->event_count += 1;
+ return 0;
+ } else if (r == -EBADMSG)
+ return r;
+
+ log_debug_errno(r, "%s: Write failed, rotating: %m", w->journal->path);
+ r = do_rotate(&w->journal, compress, seal);
+ if (r < 0)
+ return r;
+ else
+ log_debug("%s: Successfully rotated journal", w->journal->path);
+
+ log_debug("Retrying write.");
+ r = journal_file_append_entry(w->journal, ts, NULL,
+ iovw->iovec, iovw->count,
+ &w->seqnum, NULL, NULL);
+ if (r < 0)
+ return r;
+
+ if (w->server)
+ w->server->event_count += 1;
+ return 0;
+}
diff --git a/src/journal-remote/journal-remote-write.h b/src/journal-remote/journal-remote-write.h
new file mode 100644
index 0000000..e445859
--- /dev/null
+++ b/src/journal-remote/journal-remote-write.h
@@ -0,0 +1,39 @@
+/* SPDX-License-Identifier: LGPL-2.1+ */
+#pragma once
+
+#include "journal-file.h"
+#include "journal-importer.h"
+
+typedef struct RemoteServer RemoteServer;
+
+typedef struct Writer {
+ JournalFile *journal;
+ JournalMetrics metrics;
+
+ MMapCache *mmap;
+ RemoteServer *server;
+ char *hashmap_key;
+
+ uint64_t seqnum;
+
+ unsigned n_ref;
+} Writer;
+
+Writer* writer_new(RemoteServer* server);
+Writer* writer_ref(Writer *w);
+Writer* writer_unref(Writer *w);
+
+DEFINE_TRIVIAL_CLEANUP_FUNC(Writer*, writer_unref);
+
+int writer_write(Writer *s,
+ struct iovec_wrapper *iovw,
+ dual_timestamp *ts,
+ bool compress,
+ bool seal);
+
+typedef enum JournalWriteSplitMode {
+ JOURNAL_WRITE_SPLIT_NONE,
+ JOURNAL_WRITE_SPLIT_HOST,
+ _JOURNAL_WRITE_SPLIT_MAX,
+ _JOURNAL_WRITE_SPLIT_INVALID = -1
+} JournalWriteSplitMode;
diff --git a/src/journal-remote/journal-remote.c b/src/journal-remote/journal-remote.c
new file mode 100644
index 0000000..1da32c5
--- /dev/null
+++ b/src/journal-remote/journal-remote.c
@@ -0,0 +1,533 @@
+/* SPDX-License-Identifier: LGPL-2.1+ */
+
+#include <errno.h>
+#include <fcntl.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/prctl.h>
+#include <sys/socket.h>
+#include <stdint.h>
+
+#include "sd-daemon.h"
+
+#include "alloc-util.h"
+#include "def.h"
+#include "escape.h"
+#include "fd-util.h"
+#include "journal-file.h"
+#include "journal-remote-write.h"
+#include "journal-remote.h"
+#include "journald-native.h"
+#include "macro.h"
+#include "parse-util.h"
+#include "process-util.h"
+#include "socket-util.h"
+#include "stdio-util.h"
+#include "string-util.h"
+#include "strv.h"
+
+#define REMOTE_JOURNAL_PATH "/var/log/journal/remote"
+
+#define filename_escape(s) xescape((s), "/ ")
+
+static int open_output(RemoteServer *s, Writer *w, const char* host) {
+ _cleanup_free_ char *_filename = NULL;
+ const char *filename;
+ int r;
+
+ switch (s->split_mode) {
+ case JOURNAL_WRITE_SPLIT_NONE:
+ filename = s->output;
+ break;
+
+ case JOURNAL_WRITE_SPLIT_HOST: {
+ _cleanup_free_ char *name;
+
+ assert(host);
+
+ name = filename_escape(host);
+ if (!name)
+ return log_oom();
+
+ r = asprintf(&_filename, "%s/remote-%s.journal", s->output, name);
+ if (r < 0)
+ return log_oom();
+
+ filename = _filename;
+ break;
+ }
+
+ default:
+ assert_not_reached("what?");
+ }
+
+ r = journal_file_open_reliably(filename,
+ O_RDWR|O_CREAT, 0640,
+ s->compress, (uint64_t) -1, s->seal,
+ &w->metrics,
+ w->mmap, NULL,
+ NULL, &w->journal);
+ if (r < 0)
+ return log_error_errno(r, "Failed to open output journal %s: %m", filename);
+
+ log_debug("Opened output file %s", w->journal->path);
+ return 0;
+}
+
+/**********************************************************************
+ **********************************************************************
+ **********************************************************************/
+
+static int init_writer_hashmap(RemoteServer *s) {
+ static const struct hash_ops* const hash_ops[] = {
+ [JOURNAL_WRITE_SPLIT_NONE] = NULL,
+ [JOURNAL_WRITE_SPLIT_HOST] = &string_hash_ops,
+ };
+
+ assert(s);
+ assert(s->split_mode >= 0 && s->split_mode < (int) ELEMENTSOF(hash_ops));
+
+ s->writers = hashmap_new(hash_ops[s->split_mode]);
+ if (!s->writers)
+ return log_oom();
+
+ return 0;
+}
+
+int journal_remote_get_writer(RemoteServer *s, const char *host, Writer **writer) {
+ _cleanup_(writer_unrefp) Writer *w = NULL;
+ const void *key;
+ int r;
+
+ switch(s->split_mode) {
+ case JOURNAL_WRITE_SPLIT_NONE:
+ key = "one and only";
+ break;
+
+ case JOURNAL_WRITE_SPLIT_HOST:
+ assert(host);
+ key = host;
+ break;
+
+ default:
+ assert_not_reached("what split mode?");
+ }
+
+ w = hashmap_get(s->writers, key);
+ if (w)
+ writer_ref(w);
+ else {
+ w = writer_new(s);
+ if (!w)
+ return log_oom();
+
+ if (s->split_mode == JOURNAL_WRITE_SPLIT_HOST) {
+ w->hashmap_key = strdup(key);
+ if (!w->hashmap_key)
+ return log_oom();
+ }
+
+ r = open_output(s, w, host);
+ if (r < 0)
+ return r;
+
+ r = hashmap_put(s->writers, w->hashmap_key ?: key, w);
+ if (r < 0)
+ return r;
+ }
+
+ *writer = TAKE_PTR(w);
+
+ return 0;
+}
+
+/**********************************************************************
+ **********************************************************************
+ **********************************************************************/
+
+/* This should go away as soon as µhttpd allows state to be passed around. */
+RemoteServer *journal_remote_server_global;
+
+static int dispatch_raw_source_event(sd_event_source *event,
+ int fd,
+ uint32_t revents,
+ void *userdata);
+static int dispatch_raw_source_until_block(sd_event_source *event,
+ void *userdata);
+static int dispatch_blocking_source_event(sd_event_source *event,
+ void *userdata);
+static int dispatch_raw_connection_event(sd_event_source *event,
+ int fd,
+ uint32_t revents,
+ void *userdata);
+
+static int get_source_for_fd(RemoteServer *s,
+ int fd, char *name, RemoteSource **source) {
+ Writer *writer;
+ int r;
+
+ /* This takes ownership of name, but only on success. */
+
+ assert(fd >= 0);
+ assert(source);
+
+ if (!GREEDY_REALLOC0(s->sources, s->sources_size, fd + 1))
+ return log_oom();
+
+ r = journal_remote_get_writer(s, name, &writer);
+ if (r < 0)
+ return log_warning_errno(r, "Failed to get writer for source %s: %m",
+ name);
+
+ if (s->sources[fd] == NULL) {
+ s->sources[fd] = source_new(fd, false, name, writer);
+ if (!s->sources[fd]) {
+ writer_unref(writer);
+ return log_oom();
+ }
+
+ s->active++;
+ }
+
+ *source = s->sources[fd];
+ return 0;
+}
+
+static int remove_source(RemoteServer *s, int fd) {
+ RemoteSource *source;
+
+ assert(s);
+ assert(fd >= 0 && fd < (ssize_t) s->sources_size);
+
+ source = s->sources[fd];
+ if (source) {
+ /* this closes fd too */
+ source_free(source);
+ s->sources[fd] = NULL;
+ s->active--;
+ }
+
+ return 0;
+}
+
+int journal_remote_add_source(RemoteServer *s, int fd, char* name, bool own_name) {
+ RemoteSource *source = NULL;
+ int r;
+
+ /* This takes ownership of name, even on failure, if own_name is true. */
+
+ assert(s);
+ assert(fd >= 0);
+ assert(name);
+
+ if (!own_name) {
+ name = strdup(name);
+ if (!name)
+ return log_oom();
+ }
+
+ r = get_source_for_fd(s, fd, name, &source);
+ if (r < 0) {
+ log_error_errno(r, "Failed to create source for fd:%d (%s): %m",
+ fd, name);
+ free(name);
+ return r;
+ }
+
+ r = sd_event_add_io(s->events, &source->event,
+ fd, EPOLLIN|EPOLLRDHUP|EPOLLPRI,
+ dispatch_raw_source_event, source);
+ if (r == 0) {
+ /* Add additional source for buffer processing. It will be
+ * enabled later. */
+ r = sd_event_add_defer(s->events, &source->buffer_event,
+ dispatch_raw_source_until_block, source);
+ if (r == 0)
+ sd_event_source_set_enabled(source->buffer_event, SD_EVENT_OFF);
+ } else if (r == -EPERM) {
+ log_debug("Falling back to sd_event_add_defer for fd:%d (%s)", fd, name);
+ r = sd_event_add_defer(s->events, &source->event,
+ dispatch_blocking_source_event, source);
+ if (r == 0)
+ sd_event_source_set_enabled(source->event, SD_EVENT_ON);
+ }
+ if (r < 0) {
+ log_error_errno(r, "Failed to register event source for fd:%d: %m",
+ fd);
+ goto error;
+ }
+
+ r = sd_event_source_set_description(source->event, name);
+ if (r < 0) {
+ log_error_errno(r, "Failed to set source name for fd:%d: %m", fd);
+ goto error;
+ }
+
+ return 1; /* work to do */
+
+ error:
+ remove_source(s, fd);
+ return r;
+}
+
+int journal_remote_add_raw_socket(RemoteServer *s, int fd) {
+ int r;
+ _cleanup_close_ int fd_ = fd;
+ char name[STRLEN("raw-socket-") + DECIMAL_STR_MAX(int) + 1];
+
+ assert(fd >= 0);
+
+ r = sd_event_add_io(s->events, &s->listen_event,
+ fd, EPOLLIN,
+ dispatch_raw_connection_event, s);
+ if (r < 0)
+ return r;
+
+ xsprintf(name, "raw-socket-%d", fd);
+
+ r = sd_event_source_set_description(s->listen_event, name);
+ if (r < 0)
+ return r;
+
+ fd_ = -1;
+ s->active++;
+ return 0;
+}
+
+/**********************************************************************
+ **********************************************************************
+ **********************************************************************/
+
+int journal_remote_server_init(
+ RemoteServer *s,
+ const char *output,
+ JournalWriteSplitMode split_mode,
+ bool compress,
+ bool seal) {
+
+ int r;
+
+ assert(s);
+
+ assert(journal_remote_server_global == NULL);
+ journal_remote_server_global = s;
+
+ s->split_mode = split_mode;
+ s->compress = compress;
+ s->seal = seal;
+
+ if (output)
+ s->output = output;
+ else if (split_mode == JOURNAL_WRITE_SPLIT_NONE)
+ s->output = REMOTE_JOURNAL_PATH "/remote.journal";
+ else if (split_mode == JOURNAL_WRITE_SPLIT_HOST)
+ s->output = REMOTE_JOURNAL_PATH;
+ else
+ assert_not_reached("bad split mode");
+
+ r = sd_event_default(&s->events);
+ if (r < 0)
+ return log_error_errno(r, "Failed to allocate event loop: %m");
+
+ r = init_writer_hashmap(s);
+ if (r < 0)
+ return r;
+
+ return 0;
+}
+
+#if HAVE_MICROHTTPD
+static void MHDDaemonWrapper_free(MHDDaemonWrapper *d) {
+ MHD_stop_daemon(d->daemon);
+ sd_event_source_unref(d->io_event);
+ sd_event_source_unref(d->timer_event);
+ free(d);
+}
+#endif
+
+void journal_remote_server_destroy(RemoteServer *s) {
+ size_t i;
+
+#if HAVE_MICROHTTPD
+ hashmap_free_with_destructor(s->daemons, MHDDaemonWrapper_free);
+#endif
+
+ assert(s->sources_size == 0 || s->sources);
+ for (i = 0; i < s->sources_size; i++)
+ remove_source(s, i);
+ free(s->sources);
+
+ writer_unref(s->_single_writer);
+ hashmap_free(s->writers);
+
+ sd_event_source_unref(s->sigterm_event);
+ sd_event_source_unref(s->sigint_event);
+ sd_event_source_unref(s->listen_event);
+ sd_event_unref(s->events);
+
+ if (s == journal_remote_server_global)
+ journal_remote_server_global = NULL;
+
+ /* fds that we're listening on remain open... */
+}
+
+/**********************************************************************
+ **********************************************************************
+ **********************************************************************/
+
+int journal_remote_handle_raw_source(
+ sd_event_source *event,
+ int fd,
+ uint32_t revents,
+ RemoteServer *s) {
+
+ RemoteSource *source;
+ int r;
+
+ /* Returns 1 if there might be more data pending,
+ * 0 if data is currently exhausted, negative on error.
+ */
+
+ assert(fd >= 0 && fd < (ssize_t) s->sources_size);
+ source = s->sources[fd];
+ assert(source->importer.fd == fd);
+
+ r = process_source(source, s->compress, s->seal);
+ if (journal_importer_eof(&source->importer)) {
+ size_t remaining;
+
+ log_debug("EOF reached with source %s (fd=%d)",
+ source->importer.name, source->importer.fd);
+
+ remaining = journal_importer_bytes_remaining(&source->importer);
+ if (remaining > 0)
+ log_notice("Premature EOF. %zu bytes lost.", remaining);
+ remove_source(s, source->importer.fd);
+ log_debug("%zu active sources remaining", s->active);
+ return 0;
+ } else if (r == -E2BIG) {
+ log_notice("Entry with too many fields, skipped");
+ return 1;
+ } else if (r == -ENOBUFS) {
+ log_notice("Entry too big, skipped");
+ return 1;
+ } else if (r == -EAGAIN) {
+ return 0;
+ } else if (r < 0) {
+ log_debug_errno(r, "Closing connection: %m");
+ remove_source(s, fd);
+ return 0;
+ } else
+ return 1;
+}
+
+static int dispatch_raw_source_until_block(sd_event_source *event,
+ void *userdata) {
+ RemoteSource *source = userdata;
+ int r;
+
+ /* Make sure event stays around even if source is destroyed */
+ sd_event_source_ref(event);
+
+ r = journal_remote_handle_raw_source(event, source->importer.fd, EPOLLIN, journal_remote_server_global);
+ if (r != 1)
+ /* No more data for now */
+ sd_event_source_set_enabled(event, SD_EVENT_OFF);
+
+ sd_event_source_unref(event);
+
+ return r;
+}
+
+static int dispatch_raw_source_event(sd_event_source *event,
+ int fd,
+ uint32_t revents,
+ void *userdata) {
+ RemoteSource *source = userdata;
+ int r;
+
+ assert(source->event);
+ assert(source->buffer_event);
+
+ r = journal_remote_handle_raw_source(event, fd, EPOLLIN, journal_remote_server_global);
+ if (r == 1)
+ /* Might have more data. We need to rerun the handler
+ * until we are sure the buffer is exhausted. */
+ sd_event_source_set_enabled(source->buffer_event, SD_EVENT_ON);
+
+ return r;
+}
+
+static int dispatch_blocking_source_event(sd_event_source *event,
+ void *userdata) {
+ RemoteSource *source = userdata;
+
+ return journal_remote_handle_raw_source(event, source->importer.fd, EPOLLIN, journal_remote_server_global);
+}
+
+static int accept_connection(const char* type, int fd,
+ SocketAddress *addr, char **hostname) {
+ int fd2, r;
+
+ log_debug("Accepting new %s connection on fd:%d", type, fd);
+ fd2 = accept4(fd, &addr->sockaddr.sa, &addr->size, SOCK_NONBLOCK|SOCK_CLOEXEC);
+ if (fd2 < 0)
+ return log_error_errno(errno, "accept() on fd:%d failed: %m", fd);
+
+ switch(socket_address_family(addr)) {
+ case AF_INET:
+ case AF_INET6: {
+ _cleanup_free_ char *a = NULL;
+ char *b;
+
+ r = socket_address_print(addr, &a);
+ if (r < 0) {
+ log_error_errno(r, "socket_address_print(): %m");
+ close(fd2);
+ return r;
+ }
+
+ r = socknameinfo_pretty(&addr->sockaddr, addr->size, &b);
+ if (r < 0) {
+ log_error_errno(r, "Resolving hostname failed: %m");
+ close(fd2);
+ return r;
+ }
+
+ log_debug("Accepted %s %s connection from %s",
+ type,
+ socket_address_family(addr) == AF_INET ? "IP" : "IPv6",
+ a);
+
+ *hostname = b;
+
+ return fd2;
+ };
+ default:
+ log_error("Rejected %s connection with unsupported family %d",
+ type, socket_address_family(addr));
+ close(fd2);
+
+ return -EINVAL;
+ }
+}
+
+static int dispatch_raw_connection_event(sd_event_source *event,
+ int fd,
+ uint32_t revents,
+ void *userdata) {
+ RemoteServer *s = userdata;
+ int fd2;
+ SocketAddress addr = {
+ .size = sizeof(union sockaddr_union),
+ .type = SOCK_STREAM,
+ };
+ char *hostname = NULL;
+
+ fd2 = accept_connection("raw", fd, &addr, &hostname);
+ if (fd2 < 0)
+ return fd2;
+
+ return journal_remote_add_source(s, fd2, hostname, true);
+}
diff --git a/src/journal-remote/journal-remote.conf.in b/src/journal-remote/journal-remote.conf.in
new file mode 100644
index 0000000..edc3aba
--- /dev/null
+++ b/src/journal-remote/journal-remote.conf.in
@@ -0,0 +1,19 @@
+# This file is part of systemd.
+#
+# systemd is free software; you can redistribute it and/or modify it
+# under the terms of the GNU Lesser General Public License as published by
+# the Free Software Foundation; either version 2.1 of the License, or
+# (at your option) any later version.
+#
+# Entries in this file show the compile time defaults.
+# You can change settings by editing this file.
+# Defaults can be restored by simply deleting this file.
+#
+# See journal-remote.conf(5) for details
+
+[Remote]
+# Seal=false
+# SplitMode=host
+# ServerKeyFile=@CERTIFICATEROOT@/private/journal-remote.pem
+# ServerCertificateFile=@CERTIFICATEROOT@/certs/journal-remote.pem
+# TrustedCertificateFile=@CERTIFICATEROOT@/ca/trusted.pem
diff --git a/src/journal-remote/journal-remote.h b/src/journal-remote/journal-remote.h
new file mode 100644
index 0000000..4c25d43
--- /dev/null
+++ b/src/journal-remote/journal-remote.h
@@ -0,0 +1,65 @@
+/* SPDX-License-Identifier: LGPL-2.1+ */
+#pragma once
+
+#include "sd-event.h"
+
+#include "hashmap.h"
+#include "journal-remote-parse.h"
+#include "journal-remote-write.h"
+
+#if HAVE_MICROHTTPD
+#include "microhttpd-util.h"
+
+typedef struct MHDDaemonWrapper MHDDaemonWrapper;
+
+struct MHDDaemonWrapper {
+ uint64_t fd;
+ struct MHD_Daemon *daemon;
+
+ sd_event_source *io_event;
+ sd_event_source *timer_event;
+};
+#endif
+
+struct RemoteServer {
+ RemoteSource **sources;
+ size_t sources_size;
+ size_t active;
+
+ sd_event *events;
+ sd_event_source *sigterm_event, *sigint_event, *listen_event;
+
+ Hashmap *writers;
+ Writer *_single_writer;
+ uint64_t event_count;
+
+#if HAVE_MICROHTTPD
+ Hashmap *daemons;
+#endif
+ const char *output; /* either the output file or directory */
+
+ JournalWriteSplitMode split_mode;
+ bool compress;
+ bool seal;
+ bool check_trust;
+};
+extern RemoteServer *journal_remote_server_global;
+
+int journal_remote_server_init(
+ RemoteServer *s,
+ const char *output,
+ JournalWriteSplitMode split_mode,
+ bool compress,
+ bool seal);
+
+int journal_remote_get_writer(RemoteServer *s, const char *host, Writer **writer);
+
+int journal_remote_add_source(RemoteServer *s, int fd, char* name, bool own_name);
+int journal_remote_add_raw_socket(RemoteServer *s, int fd);
+int journal_remote_handle_raw_source(
+ sd_event_source *event,
+ int fd,
+ uint32_t revents,
+ RemoteServer *s);
+
+void journal_remote_server_destroy(RemoteServer *s);
diff --git a/src/journal-remote/journal-upload-journal.c b/src/journal-remote/journal-upload-journal.c
new file mode 100644
index 0000000..7d7e738
--- /dev/null
+++ b/src/journal-remote/journal-upload-journal.c
@@ -0,0 +1,414 @@
+/* SPDX-License-Identifier: LGPL-2.1+ */
+
+#include <curl/curl.h>
+#include <stdbool.h>
+
+#include "sd-daemon.h"
+
+#include "alloc-util.h"
+#include "journal-upload.h"
+#include "log.h"
+#include "string-util.h"
+#include "utf8.h"
+#include "util.h"
+
+/**
+ * Write up to size bytes to buf. Return negative on error, and number of
+ * bytes written otherwise. The last case is a kind of an error too.
+ */
+static ssize_t write_entry(char *buf, size_t size, Uploader *u) {
+ int r;
+ size_t pos = 0;
+
+ assert(size <= SSIZE_MAX);
+
+ for (;;) {
+
+ switch(u->entry_state) {
+ case ENTRY_CURSOR: {
+ u->current_cursor = mfree(u->current_cursor);
+
+ r = sd_journal_get_cursor(u->journal, &u->current_cursor);
+ if (r < 0)
+ return log_error_errno(r, "Failed to get cursor: %m");
+
+ r = snprintf(buf + pos, size - pos,
+ "__CURSOR=%s\n", u->current_cursor);
+ assert(r >= 0);
+ if ((size_t) r > size - pos)
+ /* not enough space */
+ return pos;
+
+ u->entry_state++;
+
+ if (pos + r == size) {
+ /* exactly one character short, but we don't need it */
+ buf[size - 1] = '\n';
+ return size;
+ }
+
+ pos += r;
+ }
+ _fallthrough_;
+ case ENTRY_REALTIME: {
+ usec_t realtime;
+
+ r = sd_journal_get_realtime_usec(u->journal, &realtime);
+ if (r < 0)
+ return log_error_errno(r, "Failed to get realtime timestamp: %m");
+
+ r = snprintf(buf + pos, size - pos,
+ "__REALTIME_TIMESTAMP="USEC_FMT"\n", realtime);
+ assert(r >= 0);
+ if ((size_t) r > size - pos)
+ /* not enough space */
+ return pos;
+
+ u->entry_state++;
+
+ if (r + pos == size) {
+ /* exactly one character short, but we don't need it */
+ buf[size - 1] = '\n';
+ return size;
+ }
+
+ pos += r;
+ }
+ _fallthrough_;
+ case ENTRY_MONOTONIC: {
+ usec_t monotonic;
+ sd_id128_t boot_id;
+
+ r = sd_journal_get_monotonic_usec(u->journal, &monotonic, &boot_id);
+ if (r < 0)
+ return log_error_errno(r, "Failed to get monotonic timestamp: %m");
+
+ r = snprintf(buf + pos, size - pos,
+ "__MONOTONIC_TIMESTAMP="USEC_FMT"\n", monotonic);
+ assert(r >= 0);
+ if ((size_t) r > size - pos)
+ /* not enough space */
+ return pos;
+
+ u->entry_state++;
+
+ if (r + pos == size) {
+ /* exactly one character short, but we don't need it */
+ buf[size - 1] = '\n';
+ return size;
+ }
+
+ pos += r;
+ }
+ _fallthrough_;
+ case ENTRY_BOOT_ID: {
+ sd_id128_t boot_id;
+ char sid[33];
+
+ r = sd_journal_get_monotonic_usec(u->journal, NULL, &boot_id);
+ if (r < 0)
+ return log_error_errno(r, "Failed to get monotonic timestamp: %m");
+
+ r = snprintf(buf + pos, size - pos,
+ "_BOOT_ID=%s\n", sd_id128_to_string(boot_id, sid));
+ assert(r >= 0);
+ if ((size_t) r > size - pos)
+ /* not enough space */
+ return pos;
+
+ u->entry_state++;
+
+ if (r + pos == size) {
+ /* exactly one character short, but we don't need it */
+ buf[size - 1] = '\n';
+ return size;
+ }
+
+ pos += r;
+ }
+ _fallthrough_;
+ case ENTRY_NEW_FIELD: {
+ u->field_pos = 0;
+
+ r = sd_journal_enumerate_data(u->journal,
+ &u->field_data,
+ &u->field_length);
+ if (r < 0)
+ return log_error_errno(r, "Failed to move to next field in entry: %m");
+ else if (r == 0) {
+ u->entry_state = ENTRY_OUTRO;
+ continue;
+ }
+
+ /* We already printed the boot id from the data in
+ * the header, hence let's suppress it here */
+ if (memory_startswith(u->field_data, u->field_length, "_BOOT_ID="))
+ continue;
+
+ if (!utf8_is_printable_newline(u->field_data, u->field_length, false)) {
+ u->entry_state = ENTRY_BINARY_FIELD_START;
+ continue;
+ }
+
+ u->entry_state++;
+ }
+ _fallthrough_;
+ case ENTRY_TEXT_FIELD:
+ case ENTRY_BINARY_FIELD: {
+ bool done;
+ size_t tocopy;
+
+ done = size - pos > u->field_length - u->field_pos;
+ if (done)
+ tocopy = u->field_length - u->field_pos;
+ else
+ tocopy = size - pos;
+
+ memcpy(buf + pos,
+ (char*) u->field_data + u->field_pos,
+ tocopy);
+
+ if (done) {
+ buf[pos + tocopy] = '\n';
+ pos += tocopy + 1;
+ u->entry_state = ENTRY_NEW_FIELD;
+ continue;
+ } else {
+ u->field_pos += tocopy;
+ return size;
+ }
+ }
+
+ case ENTRY_BINARY_FIELD_START: {
+ const char *c;
+ size_t len;
+
+ c = memchr(u->field_data, '=', u->field_length);
+ if (!c || c == u->field_data)
+ return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
+ "Invalid field.");
+
+ len = c - (const char*)u->field_data;
+
+ /* need space for label + '\n' */
+ if (size - pos < len + 1)
+ return pos;
+
+ memcpy(buf + pos, u->field_data, len);
+ buf[pos + len] = '\n';
+ pos += len + 1;
+
+ u->field_pos = len + 1;
+ u->entry_state++;
+ }
+ _fallthrough_;
+ case ENTRY_BINARY_FIELD_SIZE: {
+ uint64_t le64;
+
+ /* need space for uint64_t */
+ if (size - pos < 8)
+ return pos;
+
+ le64 = htole64(u->field_length - u->field_pos);
+ memcpy(buf + pos, &le64, 8);
+ pos += 8;
+
+ u->entry_state++;
+ continue;
+ }
+
+ case ENTRY_OUTRO:
+ /* need space for '\n' */
+ if (size - pos < 1)
+ return pos;
+
+ buf[pos++] = '\n';
+ u->entry_state++;
+ u->entries_sent++;
+
+ return pos;
+
+ default:
+ assert_not_reached("WTF?");
+ }
+ }
+ assert_not_reached("WTF?");
+}
+
+static void check_update_watchdog(Uploader *u) {
+ usec_t after;
+ usec_t elapsed_time;
+
+ if (u->watchdog_usec <= 0)
+ return;
+
+ after = now(CLOCK_MONOTONIC);
+ elapsed_time = usec_sub_unsigned(after, u->watchdog_timestamp);
+ if (elapsed_time > u->watchdog_usec / 2) {
+ log_debug("Update watchdog timer");
+ sd_notify(false, "WATCHDOG=1");
+ u->watchdog_timestamp = after;
+ }
+}
+
+static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void *userp) {
+ Uploader *u = userp;
+ int r;
+ sd_journal *j;
+ size_t filled = 0;
+ ssize_t w;
+
+ assert(u);
+ assert(nmemb <= SSIZE_MAX / size);
+
+ check_update_watchdog(u);
+
+ j = u->journal;
+
+ while (j && filled < size * nmemb) {
+ if (u->entry_state == ENTRY_DONE) {
+ r = sd_journal_next(j);
+ if (r < 0) {
+ log_error_errno(r, "Failed to move to next entry in journal: %m");
+ return CURL_READFUNC_ABORT;
+ } else if (r == 0) {
+ if (u->input_event)
+ log_debug("No more entries, waiting for journal.");
+ else {
+ log_info("No more entries, closing journal.");
+ close_journal_input(u);
+ }
+
+ u->uploading = false;
+
+ break;
+ }
+
+ u->entry_state = ENTRY_CURSOR;
+ }
+
+ w = write_entry((char*)buf + filled, size * nmemb - filled, u);
+ if (w < 0)
+ return CURL_READFUNC_ABORT;
+ filled += w;
+
+ if (filled == 0) {
+ log_error("Buffer space is too small to write entry.");
+ return CURL_READFUNC_ABORT;
+ } else if (u->entry_state != ENTRY_DONE)
+ /* This means that all available space was used up */
+ break;
+
+ log_debug("Entry %zu (%s) has been uploaded.",
+ u->entries_sent, u->current_cursor);
+ }
+
+ return filled;
+}
+
+void close_journal_input(Uploader *u) {
+ assert(u);
+
+ if (u->journal) {
+ log_debug("Closing journal input.");
+
+ sd_journal_close(u->journal);
+ u->journal = NULL;
+ }
+ u->timeout = 0;
+}
+
+static int process_journal_input(Uploader *u, int skip) {
+ int r;
+
+ if (u->uploading)
+ return 0;
+
+ r = sd_journal_next_skip(u->journal, skip);
+ if (r < 0)
+ return log_error_errno(r, "Failed to skip to next entry: %m");
+ else if (r < skip)
+ return 0;
+
+ /* have data */
+ u->entry_state = ENTRY_CURSOR;
+ return start_upload(u, journal_input_callback, u);
+}
+
+int check_journal_input(Uploader *u) {
+ if (u->input_event) {
+ int r;
+
+ r = sd_journal_process(u->journal);
+ if (r < 0) {
+ log_error_errno(r, "Failed to process journal: %m");
+ close_journal_input(u);
+ return r;
+ }
+
+ if (r == SD_JOURNAL_NOP)
+ return 0;
+ }
+
+ return process_journal_input(u, 1);
+}
+
+static int dispatch_journal_input(sd_event_source *event,
+ int fd,
+ uint32_t revents,
+ void *userp) {
+ Uploader *u = userp;
+
+ assert(u);
+
+ if (u->uploading)
+ return 0;
+
+ log_debug("Detected journal input, checking for new data.");
+ return check_journal_input(u);
+}
+
+int open_journal_for_upload(Uploader *u,
+ sd_journal *j,
+ const char *cursor,
+ bool after_cursor,
+ bool follow) {
+ int fd, r, events;
+
+ u->journal = j;
+
+ sd_journal_set_data_threshold(j, 0);
+
+ if (follow) {
+ fd = sd_journal_get_fd(j);
+ if (fd < 0)
+ return log_error_errno(fd, "sd_journal_get_fd failed: %m");
+
+ events = sd_journal_get_events(j);
+
+ r = sd_journal_reliable_fd(j);
+ assert(r >= 0);
+ if (r > 0)
+ u->timeout = -1;
+ else
+ u->timeout = JOURNAL_UPLOAD_POLL_TIMEOUT;
+
+ r = sd_event_add_io(u->events, &u->input_event,
+ fd, events, dispatch_journal_input, u);
+ if (r < 0)
+ return log_error_errno(r, "Failed to register input event: %m");
+
+ log_debug("Listening for journal events on fd:%d, timeout %d",
+ fd, u->timeout == (uint64_t) -1 ? -1 : (int) u->timeout);
+ } else
+ log_debug("Not listening for journal events.");
+
+ if (cursor) {
+ r = sd_journal_seek_cursor(j, cursor);
+ if (r < 0)
+ return log_error_errno(r, "Failed to seek to cursor %s: %m",
+ cursor);
+ }
+
+ return process_journal_input(u, 1 + !!after_cursor);
+}
diff --git a/src/journal-remote/journal-upload.c b/src/journal-remote/journal-upload.c
new file mode 100644
index 0000000..ef3556f
--- /dev/null
+++ b/src/journal-remote/journal-upload.c
@@ -0,0 +1,854 @@
+/* SPDX-License-Identifier: LGPL-2.1+ */
+
+#include <curl/curl.h>
+#include <fcntl.h>
+#include <getopt.h>
+#include <stdio.h>
+#include <sys/stat.h>
+
+#include "sd-daemon.h"
+
+#include "alloc-util.h"
+#include "build.h"
+#include "conf-parser.h"
+#include "daemon-util.h"
+#include "def.h"
+#include "env-file.h"
+#include "fd-util.h"
+#include "fileio.h"
+#include "format-util.h"
+#include "glob-util.h"
+#include "journal-upload.h"
+#include "log.h"
+#include "main-func.h"
+#include "mkdir.h"
+#include "parse-util.h"
+#include "pretty-print.h"
+#include "process-util.h"
+#include "rlimit-util.h"
+#include "sigbus.h"
+#include "signal-util.h"
+#include "string-util.h"
+#include "strv.h"
+#include "tmpfile-util.h"
+#include "util.h"
+
+#define PRIV_KEY_FILE CERTIFICATE_ROOT "/private/journal-upload.pem"
+#define CERT_FILE CERTIFICATE_ROOT "/certs/journal-upload.pem"
+#define TRUST_FILE CERTIFICATE_ROOT "/ca/trusted.pem"
+#define DEFAULT_PORT 19532
+
+static const char* arg_url = NULL;
+static const char *arg_key = NULL;
+static const char *arg_cert = NULL;
+static const char *arg_trust = NULL;
+static const char *arg_directory = NULL;
+static char **arg_file = NULL;
+static const char *arg_cursor = NULL;
+static bool arg_after_cursor = false;
+static int arg_journal_type = 0;
+static const char *arg_machine = NULL;
+static bool arg_merge = false;
+static int arg_follow = -1;
+static const char *arg_save_state = NULL;
+
+static void close_fd_input(Uploader *u);
+
+#define SERVER_ANSWER_KEEP 2048
+
+#define STATE_FILE "/var/lib/systemd/journal-upload/state"
+
+#define easy_setopt(curl, opt, value, level, cmd) \
+ do { \
+ code = curl_easy_setopt(curl, opt, value); \
+ if (code) { \
+ log_full(level, \
+ "curl_easy_setopt " #opt " failed: %s", \
+ curl_easy_strerror(code)); \
+ cmd; \
+ } \
+ } while (0)
+
+static size_t output_callback(char *buf,
+ size_t size,
+ size_t nmemb,
+ void *userp) {
+ Uploader *u = userp;
+
+ assert(u);
+
+ log_debug("The server answers (%zu bytes): %.*s",
+ size*nmemb, (int)(size*nmemb), buf);
+
+ if (nmemb && !u->answer) {
+ u->answer = strndup(buf, size*nmemb);
+ if (!u->answer)
+ log_warning("Failed to store server answer (%zu bytes): out of memory", size*nmemb);
+ }
+
+ return size * nmemb;
+}
+
+static int check_cursor_updating(Uploader *u) {
+ _cleanup_free_ char *temp_path = NULL;
+ _cleanup_fclose_ FILE *f = NULL;
+ int r;
+
+ if (!u->state_file)
+ return 0;
+
+ r = mkdir_parents(u->state_file, 0755);
+ if (r < 0)
+ return log_error_errno(r, "Cannot create parent directory of state file %s: %m",
+ u->state_file);
+
+ r = fopen_temporary(u->state_file, &f, &temp_path);
+ if (r < 0)
+ return log_error_errno(r, "Cannot save state to %s: %m",
+ u->state_file);
+ unlink(temp_path);
+
+ return 0;
+}
+
+static int update_cursor_state(Uploader *u) {
+ _cleanup_free_ char *temp_path = NULL;
+ _cleanup_fclose_ FILE *f = NULL;
+ int r;
+
+ if (!u->state_file || !u->last_cursor)
+ return 0;
+
+ r = fopen_temporary(u->state_file, &f, &temp_path);
+ if (r < 0)
+ goto fail;
+
+ fprintf(f,
+ "# This is private data. Do not parse.\n"
+ "LAST_CURSOR=%s\n",
+ u->last_cursor);
+
+ r = fflush_and_check(f);
+ if (r < 0)
+ goto fail;
+
+ if (rename(temp_path, u->state_file) < 0) {
+ r = -errno;
+ goto fail;
+ }
+
+ return 0;
+
+fail:
+ if (temp_path)
+ (void) unlink(temp_path);
+
+ (void) unlink(u->state_file);
+
+ return log_error_errno(r, "Failed to save state %s: %m", u->state_file);
+}
+
+static int load_cursor_state(Uploader *u) {
+ int r;
+
+ if (!u->state_file)
+ return 0;
+
+ r = parse_env_file(NULL, u->state_file, "LAST_CURSOR", &u->last_cursor);
+ if (r == -ENOENT)
+ log_debug("State file %s is not present.", u->state_file);
+ else if (r < 0)
+ return log_error_errno(r, "Failed to read state file %s: %m",
+ u->state_file);
+ else
+ log_debug("Last cursor was %s", u->last_cursor);
+
+ return 0;
+}
+
+int start_upload(Uploader *u,
+ size_t (*input_callback)(void *ptr,
+ size_t size,
+ size_t nmemb,
+ void *userdata),
+ void *data) {
+ CURLcode code;
+
+ assert(u);
+ assert(input_callback);
+
+ if (!u->header) {
+ struct curl_slist *h;
+
+ h = curl_slist_append(NULL, "Content-Type: application/vnd.fdo.journal");
+ if (!h)
+ return log_oom();
+
+ h = curl_slist_append(h, "Transfer-Encoding: chunked");
+ if (!h) {
+ curl_slist_free_all(h);
+ return log_oom();
+ }
+
+ h = curl_slist_append(h, "Accept: text/plain");
+ if (!h) {
+ curl_slist_free_all(h);
+ return log_oom();
+ }
+
+ u->header = h;
+ }
+
+ if (!u->easy) {
+ CURL *curl;
+
+ curl = curl_easy_init();
+ if (!curl)
+ return log_error_errno(SYNTHETIC_ERRNO(ENOSR),
+ "Call to curl_easy_init failed.");
+
+ /* tell it to POST to the URL */
+ easy_setopt(curl, CURLOPT_POST, 1L,
+ LOG_ERR, return -EXFULL);
+
+ easy_setopt(curl, CURLOPT_ERRORBUFFER, u->error,
+ LOG_ERR, return -EXFULL);
+
+ /* set where to write to */
+ easy_setopt(curl, CURLOPT_WRITEFUNCTION, output_callback,
+ LOG_ERR, return -EXFULL);
+
+ easy_setopt(curl, CURLOPT_WRITEDATA, data,
+ LOG_ERR, return -EXFULL);
+
+ /* set where to read from */
+ easy_setopt(curl, CURLOPT_READFUNCTION, input_callback,
+ LOG_ERR, return -EXFULL);
+
+ easy_setopt(curl, CURLOPT_READDATA, data,
+ LOG_ERR, return -EXFULL);
+
+ /* use our special own mime type and chunked transfer */
+ easy_setopt(curl, CURLOPT_HTTPHEADER, u->header,
+ LOG_ERR, return -EXFULL);
+
+ if (DEBUG_LOGGING)
+ /* enable verbose for easier tracing */
+ easy_setopt(curl, CURLOPT_VERBOSE, 1L, LOG_WARNING, );
+
+ easy_setopt(curl, CURLOPT_USERAGENT,
+ "systemd-journal-upload " GIT_VERSION,
+ LOG_WARNING, );
+
+ if (arg_key || startswith(u->url, "https://")) {
+ easy_setopt(curl, CURLOPT_SSLKEY, arg_key ?: PRIV_KEY_FILE,
+ LOG_ERR, return -EXFULL);
+ easy_setopt(curl, CURLOPT_SSLCERT, arg_cert ?: CERT_FILE,
+ LOG_ERR, return -EXFULL);
+ }
+
+ if (streq_ptr(arg_trust, "all"))
+ easy_setopt(curl, CURLOPT_SSL_VERIFYPEER, 0,
+ LOG_ERR, return -EUCLEAN);
+ else if (arg_trust || startswith(u->url, "https://"))
+ easy_setopt(curl, CURLOPT_CAINFO, arg_trust ?: TRUST_FILE,
+ LOG_ERR, return -EXFULL);
+
+ if (arg_key || arg_trust)
+ easy_setopt(curl, CURLOPT_SSLVERSION, CURL_SSLVERSION_TLSv1,
+ LOG_WARNING, );
+
+ u->easy = curl;
+ } else {
+ /* truncate the potential old error message */
+ u->error[0] = '\0';
+
+ free(u->answer);
+ u->answer = 0;
+ }
+
+ /* upload to this place */
+ code = curl_easy_setopt(u->easy, CURLOPT_URL, u->url);
+ if (code)
+ return log_error_errno(SYNTHETIC_ERRNO(EXFULL),
+ "curl_easy_setopt CURLOPT_URL failed: %s",
+ curl_easy_strerror(code));
+
+ u->uploading = true;
+
+ return 0;
+}
+
+static size_t fd_input_callback(void *buf, size_t size, size_t nmemb, void *userp) {
+ Uploader *u = userp;
+ ssize_t n;
+
+ assert(u);
+ assert(nmemb < SSIZE_MAX / size);
+
+ if (u->input < 0)
+ return 0;
+
+ assert(!size_multiply_overflow(size, nmemb));
+
+ n = read(u->input, buf, size * nmemb);
+ log_debug("%s: allowed %zu, read %zd", __func__, size*nmemb, n);
+ if (n > 0)
+ return n;
+
+ u->uploading = false;
+ if (n < 0) {
+ log_error_errno(errno, "Aborting transfer after read error on input: %m.");
+ return CURL_READFUNC_ABORT;
+ }
+
+ log_debug("Reached EOF");
+ close_fd_input(u);
+ return 0;
+}
+
+static void close_fd_input(Uploader *u) {
+ assert(u);
+
+ u->input = safe_close(u->input);
+ u->timeout = 0;
+}
+
+static int dispatch_fd_input(sd_event_source *event,
+ int fd,
+ uint32_t revents,
+ void *userp) {
+ Uploader *u = userp;
+
+ assert(u);
+ assert(fd >= 0);
+
+ if (revents & EPOLLHUP) {
+ log_debug("Received HUP");
+ close_fd_input(u);
+ return 0;
+ }
+
+ if (!(revents & EPOLLIN)) {
+ log_warning("Unexpected poll event %"PRIu32".", revents);
+ return -EINVAL;
+ }
+
+ if (u->uploading) {
+ log_warning("dispatch_fd_input called when uploading, ignoring.");
+ return 0;
+ }
+
+ return start_upload(u, fd_input_callback, u);
+}
+
+static int open_file_for_upload(Uploader *u, const char *filename) {
+ int fd, r = 0;
+
+ if (streq(filename, "-"))
+ fd = STDIN_FILENO;
+ else {
+ fd = open(filename, O_RDONLY|O_CLOEXEC|O_NOCTTY);
+ if (fd < 0)
+ return log_error_errno(errno, "Failed to open %s: %m", filename);
+ }
+
+ u->input = fd;
+
+ if (arg_follow) {
+ r = sd_event_add_io(u->events, &u->input_event,
+ fd, EPOLLIN, dispatch_fd_input, u);
+ if (r < 0) {
+ if (r != -EPERM || arg_follow > 0)
+ return log_error_errno(r, "Failed to register input event: %m");
+
+ /* Normal files should just be consumed without polling. */
+ r = start_upload(u, fd_input_callback, u);
+ }
+ }
+
+ return r;
+}
+
+static int dispatch_sigterm(sd_event_source *event,
+ const struct signalfd_siginfo *si,
+ void *userdata) {
+ Uploader *u = userdata;
+
+ assert(u);
+
+ log_received_signal(LOG_INFO, si);
+
+ close_fd_input(u);
+ close_journal_input(u);
+
+ sd_event_exit(u->events, 0);
+ return 0;
+}
+
+static int setup_signals(Uploader *u) {
+ int r;
+
+ assert(u);
+
+ assert_se(sigprocmask_many(SIG_SETMASK, NULL, SIGINT, SIGTERM, -1) >= 0);
+
+ r = sd_event_add_signal(u->events, &u->sigterm_event, SIGTERM, dispatch_sigterm, u);
+ if (r < 0)
+ return r;
+
+ r = sd_event_add_signal(u->events, &u->sigint_event, SIGINT, dispatch_sigterm, u);
+ if (r < 0)
+ return r;
+
+ return 0;
+}
+
+static int setup_uploader(Uploader *u, const char *url, const char *state_file) {
+ int r;
+ const char *host, *proto = "";
+
+ assert(u);
+ assert(url);
+
+ *u = (Uploader) {
+ .input = -1
+ };
+
+ host = STARTSWITH_SET(url, "http://", "https://");
+ if (!host) {
+ host = url;
+ proto = "https://";
+ }
+
+ if (strchr(host, ':'))
+ u->url = strjoin(proto, url, "/upload");
+ else {
+ char *t;
+ size_t x;
+
+ t = strdupa(url);
+ x = strlen(t);
+ while (x > 0 && t[x - 1] == '/')
+ t[x - 1] = '\0';
+
+ u->url = strjoin(proto, t, ":" STRINGIFY(DEFAULT_PORT), "/upload");
+ }
+ if (!u->url)
+ return log_oom();
+
+ u->state_file = state_file;
+
+ r = sd_event_default(&u->events);
+ if (r < 0)
+ return log_error_errno(r, "sd_event_default failed: %m");
+
+ r = setup_signals(u);
+ if (r < 0)
+ return log_error_errno(r, "Failed to set up signals: %m");
+
+ (void) sd_watchdog_enabled(false, &u->watchdog_usec);
+
+ return load_cursor_state(u);
+}
+
+static void destroy_uploader(Uploader *u) {
+ assert(u);
+
+ curl_easy_cleanup(u->easy);
+ curl_slist_free_all(u->header);
+ free(u->answer);
+
+ free(u->last_cursor);
+ free(u->current_cursor);
+
+ free(u->url);
+
+ u->input_event = sd_event_source_unref(u->input_event);
+
+ close_fd_input(u);
+ close_journal_input(u);
+
+ sd_event_source_unref(u->sigterm_event);
+ sd_event_source_unref(u->sigint_event);
+ sd_event_unref(u->events);
+}
+
+static int perform_upload(Uploader *u) {
+ CURLcode code;
+ long status;
+
+ assert(u);
+
+ u->watchdog_timestamp = now(CLOCK_MONOTONIC);
+ code = curl_easy_perform(u->easy);
+ if (code) {
+ if (u->error[0])
+ log_error("Upload to %s failed: %.*s",
+ u->url, (int) sizeof(u->error), u->error);
+ else
+ log_error("Upload to %s failed: %s",
+ u->url, curl_easy_strerror(code));
+ return -EIO;
+ }
+
+ code = curl_easy_getinfo(u->easy, CURLINFO_RESPONSE_CODE, &status);
+ if (code)
+ return log_error_errno(SYNTHETIC_ERRNO(EUCLEAN),
+ "Failed to retrieve response code: %s",
+ curl_easy_strerror(code));
+
+ if (status >= 300)
+ return log_error_errno(SYNTHETIC_ERRNO(EIO),
+ "Upload to %s failed with code %ld: %s",
+ u->url, status, strna(u->answer));
+ else if (status < 200)
+ return log_error_errno(SYNTHETIC_ERRNO(EIO),
+ "Upload to %s finished with unexpected code %ld: %s",
+ u->url, status, strna(u->answer));
+ else
+ log_debug("Upload finished successfully with code %ld: %s",
+ status, strna(u->answer));
+
+ free_and_replace(u->last_cursor, u->current_cursor);
+
+ return update_cursor_state(u);
+}
+
+static int parse_config(void) {
+ const ConfigTableItem items[] = {
+ { "Upload", "URL", config_parse_string, 0, &arg_url },
+ { "Upload", "ServerKeyFile", config_parse_path, 0, &arg_key },
+ { "Upload", "ServerCertificateFile", config_parse_path, 0, &arg_cert },
+ { "Upload", "TrustedCertificateFile", config_parse_path, 0, &arg_trust },
+ {}};
+
+ return config_parse_many_nulstr(PKGSYSCONFDIR "/journal-upload.conf",
+ CONF_PATHS_NULSTR("systemd/journal-upload.conf.d"),
+ "Upload\0", config_item_table_lookup, items,
+ CONFIG_PARSE_WARN, NULL);
+}
+
+static int help(void) {
+ _cleanup_free_ char *link = NULL;
+ int r;
+
+ r = terminal_urlify_man("systemd-journal-upload.service", "8", &link);
+ if (r < 0)
+ return log_oom();
+
+ printf("%s -u URL {FILE|-}...\n\n"
+ "Upload journal events to a remote server.\n\n"
+ " -h --help Show this help\n"
+ " --version Show package version\n"
+ " -u --url=URL Upload to this address (default port "
+ STRINGIFY(DEFAULT_PORT) ")\n"
+ " --key=FILENAME Specify key in PEM format (default:\n"
+ " \"" PRIV_KEY_FILE "\")\n"
+ " --cert=FILENAME Specify certificate in PEM format (default:\n"
+ " \"" CERT_FILE "\")\n"
+ " --trust=FILENAME|all Specify CA certificate or disable checking (default:\n"
+ " \"" TRUST_FILE "\")\n"
+ " --system Use the system journal\n"
+ " --user Use the user journal for the current user\n"
+ " -m --merge Use all available journals\n"
+ " -M --machine=CONTAINER Operate on local container\n"
+ " -D --directory=PATH Use journal files from directory\n"
+ " --file=PATH Use this journal file\n"
+ " --cursor=CURSOR Start at the specified cursor\n"
+ " --after-cursor=CURSOR Start after the specified cursor\n"
+ " --follow[=BOOL] Do [not] wait for input\n"
+ " --save-state[=FILE] Save uploaded cursors (default \n"
+ " " STATE_FILE ")\n"
+ "\nSee the %s for details.\n"
+ , program_invocation_short_name
+ , link
+ );
+
+ return 0;
+}
+
+static int parse_argv(int argc, char *argv[]) {
+ enum {
+ ARG_VERSION = 0x100,
+ ARG_KEY,
+ ARG_CERT,
+ ARG_TRUST,
+ ARG_USER,
+ ARG_SYSTEM,
+ ARG_FILE,
+ ARG_CURSOR,
+ ARG_AFTER_CURSOR,
+ ARG_FOLLOW,
+ ARG_SAVE_STATE,
+ };
+
+ static const struct option options[] = {
+ { "help", no_argument, NULL, 'h' },
+ { "version", no_argument, NULL, ARG_VERSION },
+ { "url", required_argument, NULL, 'u' },
+ { "key", required_argument, NULL, ARG_KEY },
+ { "cert", required_argument, NULL, ARG_CERT },
+ { "trust", required_argument, NULL, ARG_TRUST },
+ { "system", no_argument, NULL, ARG_SYSTEM },
+ { "user", no_argument, NULL, ARG_USER },
+ { "merge", no_argument, NULL, 'm' },
+ { "machine", required_argument, NULL, 'M' },
+ { "directory", required_argument, NULL, 'D' },
+ { "file", required_argument, NULL, ARG_FILE },
+ { "cursor", required_argument, NULL, ARG_CURSOR },
+ { "after-cursor", required_argument, NULL, ARG_AFTER_CURSOR },
+ { "follow", optional_argument, NULL, ARG_FOLLOW },
+ { "save-state", optional_argument, NULL, ARG_SAVE_STATE },
+ {}
+ };
+
+ int c, r;
+
+ assert(argc >= 0);
+ assert(argv);
+
+ opterr = 0;
+
+ while ((c = getopt_long(argc, argv, "hu:mM:D:", options, NULL)) >= 0)
+ switch(c) {
+ case 'h':
+ return help();
+
+ case ARG_VERSION:
+ return version();
+
+ case 'u':
+ if (arg_url)
+ return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
+ "cannot use more than one --url");
+
+ arg_url = optarg;
+ break;
+
+ case ARG_KEY:
+ if (arg_key)
+ return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
+ "cannot use more than one --key");
+
+ arg_key = optarg;
+ break;
+
+ case ARG_CERT:
+ if (arg_cert)
+ return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
+ "cannot use more than one --cert");
+
+ arg_cert = optarg;
+ break;
+
+ case ARG_TRUST:
+ if (arg_trust)
+ return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
+ "cannot use more than one --trust");
+
+ arg_trust = optarg;
+ break;
+
+ case ARG_SYSTEM:
+ arg_journal_type |= SD_JOURNAL_SYSTEM;
+ break;
+
+ case ARG_USER:
+ arg_journal_type |= SD_JOURNAL_CURRENT_USER;
+ break;
+
+ case 'm':
+ arg_merge = true;
+ break;
+
+ case 'M':
+ if (arg_machine)
+ return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
+ "cannot use more than one --machine/-M");
+
+ arg_machine = optarg;
+ break;
+
+ case 'D':
+ if (arg_directory)
+ return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
+ "cannot use more than one --directory/-D");
+
+ arg_directory = optarg;
+ break;
+
+ case ARG_FILE:
+ r = glob_extend(&arg_file, optarg);
+ if (r < 0)
+ return log_error_errno(r, "Failed to add paths: %m");
+ break;
+
+ case ARG_CURSOR:
+ if (arg_cursor)
+ return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
+ "cannot use more than one --cursor/--after-cursor");
+
+ arg_cursor = optarg;
+ break;
+
+ case ARG_AFTER_CURSOR:
+ if (arg_cursor)
+ return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
+ "cannot use more than one --cursor/--after-cursor");
+
+ arg_cursor = optarg;
+ arg_after_cursor = true;
+ break;
+
+ case ARG_FOLLOW:
+ if (optarg) {
+ r = parse_boolean(optarg);
+ if (r < 0)
+ return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
+ "Failed to parse --follow= parameter.");
+
+ arg_follow = !!r;
+ } else
+ arg_follow = true;
+
+ break;
+
+ case ARG_SAVE_STATE:
+ arg_save_state = optarg ?: STATE_FILE;
+ break;
+
+ case '?':
+ return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
+ "Unknown option %s.",
+ argv[optind - 1]);
+
+ case ':':
+ return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
+ "Missing argument to %s.",
+ argv[optind - 1]);
+
+ default:
+ assert_not_reached("Unhandled option code.");
+ }
+
+ if (!arg_url)
+ return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
+ "Required --url/-u option missing.");
+
+ if (!!arg_key != !!arg_cert)
+ return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
+ "Options --key and --cert must be used together.");
+
+ if (optind < argc && (arg_directory || arg_file || arg_machine || arg_journal_type))
+ return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
+ "Input arguments make no sense with journal input.");
+
+ return 1;
+}
+
+static int open_journal(sd_journal **j) {
+ int r;
+
+ if (arg_directory)
+ r = sd_journal_open_directory(j, arg_directory, arg_journal_type);
+ else if (arg_file)
+ r = sd_journal_open_files(j, (const char**) arg_file, 0);
+ else if (arg_machine)
+ r = sd_journal_open_container(j, arg_machine, 0);
+ else
+ r = sd_journal_open(j, !arg_merge*SD_JOURNAL_LOCAL_ONLY + arg_journal_type);
+ if (r < 0)
+ log_error_errno(r, "Failed to open %s: %m",
+ arg_directory ? arg_directory : arg_file ? "files" : "journal");
+ return r;
+}
+
+static int run(int argc, char **argv) {
+ _cleanup_(notify_on_cleanup) const char *notify_message = NULL;
+ _cleanup_(destroy_uploader) Uploader u = {};
+ bool use_journal;
+ int r;
+
+ log_show_color(true);
+ log_parse_environment();
+
+ /* The journal merging logic potentially needs a lot of fds. */
+ (void) rlimit_nofile_bump(HIGH_RLIMIT_NOFILE);
+
+ r = parse_config();
+ if (r < 0)
+ return r;
+
+ r = parse_argv(argc, argv);
+ if (r <= 0)
+ return r;
+
+ sigbus_install();
+
+ r = setup_uploader(&u, arg_url, arg_save_state);
+ if (r < 0)
+ return r;
+
+ sd_event_set_watchdog(u.events, true);
+
+ r = check_cursor_updating(&u);
+ if (r < 0)
+ return r;
+
+ log_debug("%s running as pid "PID_FMT,
+ program_invocation_short_name, getpid_cached());
+
+ use_journal = optind >= argc;
+ if (use_journal) {
+ sd_journal *j;
+ r = open_journal(&j);
+ if (r < 0)
+ return r;
+ r = open_journal_for_upload(&u, j,
+ arg_cursor ?: u.last_cursor,
+ arg_cursor ? arg_after_cursor : true,
+ !!arg_follow);
+ if (r < 0)
+ return r;
+ }
+
+ notify_message = notify_start("READY=1\n"
+ "STATUS=Processing input...",
+ NOTIFY_STOPPING);
+
+ for (;;) {
+ r = sd_event_get_state(u.events);
+ if (r < 0)
+ return r;
+ if (r == SD_EVENT_FINISHED)
+ return 0;
+
+ if (use_journal) {
+ if (!u.journal)
+ return 0;
+
+ r = check_journal_input(&u);
+ } else if (u.input < 0 && !use_journal) {
+ if (optind >= argc)
+ return 0;
+
+ log_debug("Using %s as input.", argv[optind]);
+ r = open_file_for_upload(&u, argv[optind++]);
+ }
+ if (r < 0)
+ return r;
+
+ if (u.uploading) {
+ r = perform_upload(&u);
+ if (r < 0)
+ return r;
+ }
+
+ r = sd_event_run(u.events, u.timeout);
+ if (r < 0)
+ return log_error_errno(r, "Failed to run event loop: %m");
+ }
+}
+
+DEFINE_MAIN_FUNCTION(run);
diff --git a/src/journal-remote/journal-upload.conf.in b/src/journal-remote/journal-upload.conf.in
new file mode 100644
index 0000000..5f59a6f
--- /dev/null
+++ b/src/journal-remote/journal-upload.conf.in
@@ -0,0 +1,18 @@
+# This file is part of systemd.
+#
+# systemd is free software; you can redistribute it and/or modify it
+# under the terms of the GNU Lesser General Public License as published by
+# the Free Software Foundation; either version 2.1 of the License, or
+# (at your option) any later version.
+#
+# Entries in this file show the compile time defaults.
+# You can change settings by editing this file.
+# Defaults can be restored by simply deleting this file.
+#
+# See journal-upload.conf(5) for details
+
+[Upload]
+# URL=
+# ServerKeyFile=@CERTIFICATEROOT@/private/journal-upload.pem
+# ServerCertificateFile=@CERTIFICATEROOT@/certs/journal-upload.pem
+# TrustedCertificateFile=@CERTIFICATEROOT@/ca/trusted.pem
diff --git a/src/journal-remote/journal-upload.h b/src/journal-remote/journal-upload.h
new file mode 100644
index 0000000..5711905
--- /dev/null
+++ b/src/journal-remote/journal-upload.h
@@ -0,0 +1,71 @@
+#pragma once
+
+#include <inttypes.h>
+
+#include "sd-event.h"
+#include "sd-journal.h"
+#include "time-util.h"
+
+typedef enum {
+ ENTRY_CURSOR = 0, /* Nothing actually written yet. */
+ ENTRY_REALTIME,
+ ENTRY_MONOTONIC,
+ ENTRY_BOOT_ID,
+ ENTRY_NEW_FIELD, /* In between fields. */
+ ENTRY_TEXT_FIELD, /* In the middle of a text field. */
+ ENTRY_BINARY_FIELD_START, /* Writing the name of a binary field. */
+ ENTRY_BINARY_FIELD_SIZE, /* Writing the size of a binary field. */
+ ENTRY_BINARY_FIELD, /* In the middle of a binary field. */
+ ENTRY_OUTRO, /* Writing '\n' */
+ ENTRY_DONE, /* Need to move to a new field. */
+} entry_state;
+
+typedef struct Uploader {
+ sd_event *events;
+ sd_event_source *sigint_event, *sigterm_event;
+
+ char *url;
+ CURL *easy;
+ bool uploading;
+ char error[CURL_ERROR_SIZE];
+ struct curl_slist *header;
+ char *answer;
+
+ sd_event_source *input_event;
+ uint64_t timeout;
+
+ /* fd stuff */
+ int input;
+
+ /* journal stuff */
+ sd_journal* journal;
+
+ entry_state entry_state;
+ const void *field_data;
+ size_t field_pos, field_length;
+
+ /* general metrics */
+ const char *state_file;
+
+ size_t entries_sent;
+ char *last_cursor, *current_cursor;
+ usec_t watchdog_timestamp;
+ usec_t watchdog_usec;
+} Uploader;
+
+#define JOURNAL_UPLOAD_POLL_TIMEOUT (10 * USEC_PER_SEC)
+
+int start_upload(Uploader *u,
+ size_t (*input_callback)(void *ptr,
+ size_t size,
+ size_t nmemb,
+ void *userdata),
+ void *data);
+
+int open_journal_for_upload(Uploader *u,
+ sd_journal *j,
+ const char *cursor,
+ bool after_cursor,
+ bool follow);
+void close_journal_input(Uploader *u);
+int check_journal_input(Uploader *u);
diff --git a/src/journal-remote/log-generator.py b/src/journal-remote/log-generator.py
new file mode 100755
index 0000000..e1725b1
--- /dev/null
+++ b/src/journal-remote/log-generator.py
@@ -0,0 +1,78 @@
+#!/usr/bin/env python3
+import sys
+import argparse
+
+PARSER = argparse.ArgumentParser()
+PARSER.add_argument('n', type=int)
+PARSER.add_argument('--dots', action='store_true')
+PARSER.add_argument('-m', '--message-size', type=int, default=200)
+PARSER.add_argument('-d', '--data-size', type=int, default=4000)
+PARSER.add_argument('--data-type', choices={'random', 'simple'})
+OPTIONS = PARSER.parse_args()
+
+template = """\
+__CURSOR=s=6863c726210b4560b7048889d8ada5c5;i=3e931;b=f446871715504074bf7049ef0718fa93;m={m:x};t=4fd05c
+__REALTIME_TIMESTAMP={realtime_ts}
+__MONOTONIC_TIMESTAMP={monotonic_ts}
+_BOOT_ID=f446871715504074bf7049ef0718fa93
+_TRANSPORT=syslog
+PRIORITY={priority}
+SYSLOG_FACILITY={facility}
+SYSLOG_IDENTIFIER=/USR/SBIN/CRON
+MESSAGE={message}
+_UID=0
+_GID=0
+_MACHINE_ID=69121ca41d12c1b69a7960174c27b618
+_HOSTNAME=hostname
+SYSLOG_PID=25721
+_PID=25721
+_SOURCE_REALTIME_TIMESTAMP={source_realtime_ts}
+DATA={data}
+"""
+
+m = 0x198603b12d7
+realtime_ts = 1404101101501873
+monotonic_ts = 1753961140951
+source_realtime_ts = 1404101101483516
+priority = 3
+facility = 6
+
+src = open('/dev/urandom', 'rb')
+
+bytes = 0
+counter = 0
+
+for i in range(OPTIONS.n):
+ message = src.read(OPTIONS.message_size)
+ message = repr(message)[2:-1]
+
+ if OPTIONS.data_type == 'random':
+ data = repr(src.read(OPTIONS.data_size))
+ else:
+ # keep the pattern non-repeating so we get a different blob every time
+ data = '{:0{}}'.format(counter, OPTIONS.data_size)
+ counter += 1
+
+ entry = template.format(m=m,
+ realtime_ts=realtime_ts,
+ monotonic_ts=monotonic_ts,
+ source_realtime_ts=source_realtime_ts,
+ priority=priority,
+ facility=facility,
+ message=message,
+ data=data)
+ m += 1
+ realtime_ts += 1
+ monotonic_ts += 1
+ source_realtime_ts += 1
+
+ bytes += len(entry)
+
+ print(entry)
+
+ if OPTIONS.dots:
+ print('.', file=sys.stderr, end='', flush=True)
+
+if OPTIONS.dots:
+ print(file=sys.stderr)
+print('Wrote {} bytes'.format(bytes), file=sys.stderr)
diff --git a/src/journal-remote/meson.build b/src/journal-remote/meson.build
new file mode 100644
index 0000000..2887daa
--- /dev/null
+++ b/src/journal-remote/meson.build
@@ -0,0 +1,71 @@
+# SPDX-License-Identifier: LGPL-2.1+
+
+systemd_journal_upload_sources = files('''
+ journal-upload.h
+ journal-upload.c
+ journal-upload-journal.c
+'''.split())
+
+libsystemd_journal_remote_sources = files('''
+ journal-remote-parse.h
+ journal-remote-parse.c
+ journal-remote-write.h
+ journal-remote-write.c
+ journal-remote.h
+ journal-remote.c
+'''.split())
+
+if conf.get('HAVE_MICROHTTPD') == 1
+ libsystemd_journal_remote_sources += files('''
+ microhttpd-util.h
+ microhttpd-util.c
+'''.split())
+endif
+
+libsystemd_journal_remote = static_library(
+ 'systemd-journal-remote',
+ libsystemd_journal_remote_sources,
+ include_directories : includes,
+ dependencies : [threads,
+ libmicrohttpd,
+ libgnutls,
+ libxz,
+ liblz4],
+ install : false)
+
+systemd_journal_remote_sources = files('''
+ journal-remote-main.c
+'''.split())
+
+systemd_journal_gatewayd_sources = files('''
+ journal-gatewayd.c
+ microhttpd-util.h
+ microhttpd-util.c
+'''.split())
+
+if conf.get('ENABLE_REMOTE') ==1 and conf.get('HAVE_LIBCURL') == 1
+ journal_upload_conf = configure_file(
+ input : 'journal-upload.conf.in',
+ output : 'journal-upload.conf',
+ configuration : substs)
+ install_data(journal_upload_conf,
+ install_dir : pkgsysconfdir)
+endif
+
+if conf.get('ENABLE_REMOTE') == 1 and conf.get('HAVE_MICROHTTPD') == 1
+ journal_remote_conf = configure_file(
+ input : 'journal-remote.conf.in',
+ output : 'journal-remote.conf',
+ configuration : substs)
+ install_data(journal_remote_conf,
+ install_dir : pkgsysconfdir)
+
+ install_data('browse.html',
+ install_dir : join_paths(pkgdatadir, 'gatewayd'))
+
+ meson.add_install_script('sh', '-c',
+ mkdir_p.format('/var/log/journal/remote'))
+ meson.add_install_script('sh', '-c',
+ '''chown 0:0 $DESTDIR/var/log/journal/remote &&
+ chmod 755 $DESTDIR/var/log/journal/remote || :''')
+endif
diff --git a/src/journal-remote/microhttpd-util.c b/src/journal-remote/microhttpd-util.c
new file mode 100644
index 0000000..5f56919
--- /dev/null
+++ b/src/journal-remote/microhttpd-util.c
@@ -0,0 +1,312 @@
+/* SPDX-License-Identifier: LGPL-2.1+ */
+
+#include <stddef.h>
+#include <stdio.h>
+#include <string.h>
+
+#if HAVE_GNUTLS
+#include <gnutls/gnutls.h>
+#include <gnutls/x509.h>
+#endif
+
+#include "alloc-util.h"
+#include "log.h"
+#include "macro.h"
+#include "microhttpd-util.h"
+#include "string-util.h"
+#include "strv.h"
+#include "util.h"
+
+void microhttpd_logger(void *arg, const char *fmt, va_list ap) {
+ char *f;
+
+ f = strjoina("microhttpd: ", fmt);
+
+ DISABLE_WARNING_FORMAT_NONLITERAL;
+ log_internalv(LOG_INFO, 0, NULL, 0, NULL, f, ap);
+ REENABLE_WARNING;
+}
+
+static int mhd_respond_internal(struct MHD_Connection *connection,
+ enum MHD_RequestTerminationCode code,
+ const char *buffer,
+ size_t size,
+ enum MHD_ResponseMemoryMode mode) {
+ assert(connection);
+
+ _cleanup_(MHD_destroy_responsep) struct MHD_Response *response
+ = MHD_create_response_from_buffer(size, (char*) buffer, mode);
+ if (!response)
+ return MHD_NO;
+
+ log_debug("Queueing response %u: %s", code, buffer);
+ MHD_add_response_header(response, "Content-Type", "text/plain");
+ return MHD_queue_response(connection, code, response);
+}
+
+int mhd_respond(struct MHD_Connection *connection,
+ enum MHD_RequestTerminationCode code,
+ const char *message) {
+
+ const char *fmt;
+
+ fmt = strjoina(message, "\n");
+
+ return mhd_respond_internal(connection, code,
+ fmt, strlen(message) + 1,
+ MHD_RESPMEM_PERSISTENT);
+}
+
+int mhd_respond_oom(struct MHD_Connection *connection) {
+ return mhd_respond(connection, MHD_HTTP_SERVICE_UNAVAILABLE, "Out of memory.");
+}
+
+int mhd_respondf(struct MHD_Connection *connection,
+ int error,
+ enum MHD_RequestTerminationCode code,
+ const char *format, ...) {
+
+ const char *fmt;
+ char *m;
+ int r;
+ va_list ap;
+
+ assert(connection);
+ assert(format);
+
+ if (error < 0)
+ error = -error;
+ errno = -error;
+ fmt = strjoina(format, "\n");
+ va_start(ap, format);
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wformat-nonliteral"
+ r = vasprintf(&m, fmt, ap);
+#pragma GCC diagnostic pop
+ va_end(ap);
+
+ if (r < 0)
+ return respond_oom(connection);
+
+ return mhd_respond_internal(connection, code, m, r, MHD_RESPMEM_MUST_FREE);
+}
+
+#if HAVE_GNUTLS
+
+static struct {
+ const char *const names[4];
+ int level;
+ bool enabled;
+} gnutls_log_map[] = {
+ { {"0"}, LOG_DEBUG },
+ { {"1", "audit"}, LOG_WARNING, true}, /* gnutls session audit */
+ { {"2", "assert"}, LOG_DEBUG }, /* gnutls assert log */
+ { {"3", "hsk", "ext"}, LOG_DEBUG }, /* gnutls handshake log */
+ { {"4", "rec"}, LOG_DEBUG }, /* gnutls record log */
+ { {"5", "dtls"}, LOG_DEBUG }, /* gnutls DTLS log */
+ { {"6", "buf"}, LOG_DEBUG },
+ { {"7", "write", "read"}, LOG_DEBUG },
+ { {"8"}, LOG_DEBUG },
+ { {"9", "enc", "int"}, LOG_DEBUG },
+};
+
+static void log_func_gnutls(int level, const char *message) {
+ assert_se(message);
+
+ if (0 <= level && level < (int) ELEMENTSOF(gnutls_log_map)) {
+ if (gnutls_log_map[level].enabled)
+ log_internal(gnutls_log_map[level].level, 0, NULL, 0, NULL, "gnutls %d/%s: %s", level, gnutls_log_map[level].names[1], message);
+ } else {
+ log_debug("Received GNUTLS message with unknown level %d.", level);
+ log_internal(LOG_DEBUG, 0, NULL, 0, NULL, "gnutls: %s", message);
+ }
+}
+
+static void log_reset_gnutls_level(void) {
+ int i;
+
+ for (i = ELEMENTSOF(gnutls_log_map) - 1; i >= 0; i--)
+ if (gnutls_log_map[i].enabled) {
+ log_debug("Setting gnutls log level to %d", i);
+ gnutls_global_set_log_level(i);
+ break;
+ }
+}
+
+static int log_enable_gnutls_category(const char *cat) {
+ unsigned i;
+
+ if (streq(cat, "all")) {
+ for (i = 0; i < ELEMENTSOF(gnutls_log_map); i++)
+ gnutls_log_map[i].enabled = true;
+ log_reset_gnutls_level();
+ return 0;
+ } else
+ for (i = 0; i < ELEMENTSOF(gnutls_log_map); i++)
+ if (strv_contains((char**)gnutls_log_map[i].names, cat)) {
+ gnutls_log_map[i].enabled = true;
+ log_reset_gnutls_level();
+ return 0;
+ }
+ return log_error_errno(SYNTHETIC_ERRNO(EINVAL), "No such log category: %s", cat);
+}
+
+int setup_gnutls_logger(char **categories) {
+ char **cat;
+ int r;
+
+ gnutls_global_set_log_function(log_func_gnutls);
+
+ if (categories) {
+ STRV_FOREACH(cat, categories) {
+ r = log_enable_gnutls_category(*cat);
+ if (r < 0)
+ return r;
+ }
+ } else
+ log_reset_gnutls_level();
+
+ return 0;
+}
+
+static int verify_cert_authorized(gnutls_session_t session) {
+ unsigned status;
+ gnutls_certificate_type_t type;
+ gnutls_datum_t out;
+ int r;
+
+ r = gnutls_certificate_verify_peers2(session, &status);
+ if (r < 0)
+ return log_error_errno(r, "gnutls_certificate_verify_peers2 failed: %m");
+
+ type = gnutls_certificate_type_get(session);
+ r = gnutls_certificate_verification_status_print(status, type, &out, 0);
+ if (r < 0)
+ return log_error_errno(r, "gnutls_certificate_verification_status_print failed: %m");
+
+ log_debug("Certificate status: %s", out.data);
+ gnutls_free(out.data);
+
+ return status == 0 ? 0 : -EPERM;
+}
+
+static int get_client_cert(gnutls_session_t session, gnutls_x509_crt_t *client_cert) {
+ const gnutls_datum_t *pcert;
+ unsigned listsize;
+ gnutls_x509_crt_t cert;
+ int r;
+
+ assert(session);
+ assert(client_cert);
+
+ pcert = gnutls_certificate_get_peers(session, &listsize);
+ if (!pcert || !listsize)
+ return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
+ "Failed to retrieve certificate chain");
+
+ r = gnutls_x509_crt_init(&cert);
+ if (r < 0) {
+ log_error("Failed to initialize client certificate");
+ return r;
+ }
+
+ /* Note that by passing values between 0 and listsize here, you
+ can get access to the CA's certs */
+ r = gnutls_x509_crt_import(cert, &pcert[0], GNUTLS_X509_FMT_DER);
+ if (r < 0) {
+ log_error("Failed to import client certificate");
+ gnutls_x509_crt_deinit(cert);
+ return r;
+ }
+
+ *client_cert = cert;
+ return 0;
+}
+
+static int get_auth_dn(gnutls_x509_crt_t client_cert, char **buf) {
+ size_t len = 0;
+ int r;
+
+ assert(buf);
+ assert(*buf == NULL);
+
+ r = gnutls_x509_crt_get_dn(client_cert, NULL, &len);
+ if (r != GNUTLS_E_SHORT_MEMORY_BUFFER) {
+ log_error("gnutls_x509_crt_get_dn failed");
+ return r;
+ }
+
+ *buf = malloc(len);
+ if (!*buf)
+ return log_oom();
+
+ gnutls_x509_crt_get_dn(client_cert, *buf, &len);
+ return 0;
+}
+
+static void gnutls_x509_crt_deinitp(gnutls_x509_crt_t *p) {
+ gnutls_x509_crt_deinit(*p);
+}
+
+int check_permissions(struct MHD_Connection *connection, int *code, char **hostname) {
+ const union MHD_ConnectionInfo *ci;
+ gnutls_session_t session;
+ _cleanup_(gnutls_x509_crt_deinitp) gnutls_x509_crt_t client_cert = NULL;
+ _cleanup_free_ char *buf = NULL;
+ int r;
+
+ assert(connection);
+ assert(code);
+
+ *code = 0;
+
+ ci = MHD_get_connection_info(connection,
+ MHD_CONNECTION_INFO_GNUTLS_SESSION);
+ if (!ci) {
+ log_error("MHD_get_connection_info failed: session is unencrypted");
+ *code = mhd_respond(connection, MHD_HTTP_FORBIDDEN,
+ "Encrypted connection is required");
+ return -EPERM;
+ }
+ session = ci->tls_session;
+ assert(session);
+
+ r = get_client_cert(session, &client_cert);
+ if (r < 0) {
+ *code = mhd_respond(connection, MHD_HTTP_UNAUTHORIZED,
+ "Authorization through certificate is required");
+ return -EPERM;
+ }
+
+ r = get_auth_dn(client_cert, &buf);
+ if (r < 0) {
+ *code = mhd_respond(connection, MHD_HTTP_UNAUTHORIZED,
+ "Failed to determine distinguished name from certificate");
+ return -EPERM;
+ }
+
+ log_debug("Connection from %s", buf);
+
+ if (hostname)
+ *hostname = TAKE_PTR(buf);
+
+ r = verify_cert_authorized(session);
+ if (r < 0) {
+ log_warning("Client is not authorized");
+ *code = mhd_respond(connection, MHD_HTTP_UNAUTHORIZED,
+ "Client certificate not signed by recognized authority");
+ }
+ return r;
+}
+
+#else
+int check_permissions(struct MHD_Connection *connection, int *code, char **hostname) {
+ return -EPERM;
+}
+
+int setup_gnutls_logger(char **categories) {
+ if (categories)
+ log_notice("Ignoring specified gnutls logging categories — gnutls not available.");
+ return 0;
+}
+#endif
diff --git a/src/journal-remote/microhttpd-util.h b/src/journal-remote/microhttpd-util.h
new file mode 100644
index 0000000..ba51d84
--- /dev/null
+++ b/src/journal-remote/microhttpd-util.h
@@ -0,0 +1,78 @@
+/* SPDX-License-Identifier: LGPL-2.1+ */
+#pragma once
+
+#include <microhttpd.h>
+#include <stdarg.h>
+
+#include "macro.h"
+
+/* Those defines are added when options are renamed. If the old names
+ * are not '#define'd, then they are not deprecated yet and there are
+ * enum elements with the same name. Hence let's check for the *old* name,
+ * and define the new name by the value of the old name. */
+
+/* Renamed in µhttpd 0.9.51 */
+#ifndef MHD_USE_PIPE_FOR_SHUTDOWN
+# define MHD_USE_ITC MHD_USE_PIPE_FOR_SHUTDOWN
+#endif
+
+/* Renamed in µhttpd 0.9.52 */
+#ifndef MHD_USE_EPOLL_LINUX_ONLY
+# define MHD_USE_EPOLL MHD_USE_EPOLL_LINUX_ONLY
+#endif
+
+/* Renamed in µhttpd 0.9.52 */
+#ifndef MHD_USE_SSL
+# define MHD_USE_TLS MHD_USE_SSL
+#endif
+
+/* Renamed in µhttpd 0.9.53 */
+#ifndef MHD_USE_POLL_INTERNALLY
+# define MHD_USE_POLL_INTERNAL_THREAD MHD_USE_POLL_INTERNALLY
+#endif
+
+/* Both the old and new names are defines, check for the new one. */
+
+/* Compatiblity with libmicrohttpd < 0.9.38 */
+#ifndef MHD_HTTP_NOT_ACCEPTABLE
+# define MHD_HTTP_NOT_ACCEPTABLE MHD_HTTP_METHOD_NOT_ACCEPTABLE
+#endif
+
+/* Renamed in µhttpd 0.9.53 */
+#ifndef MHD_HTTP_PAYLOAD_TOO_LARGE
+# define MHD_HTTP_PAYLOAD_TOO_LARGE MHD_HTTP_REQUEST_ENTITY_TOO_LARGE
+#endif
+
+#if MHD_VERSION < 0x00094203
+# define MHD_create_response_from_fd_at_offset64 MHD_create_response_from_fd_at_offset
+#endif
+
+void microhttpd_logger(void *arg, const char *fmt, va_list ap) _printf_(2, 0);
+
+/* respond_oom() must be usable with return, hence this form. */
+#define respond_oom(connection) log_oom(), mhd_respond_oom(connection)
+
+int mhd_respondf(struct MHD_Connection *connection,
+ int error,
+ unsigned code,
+ const char *format, ...) _printf_(4,5);
+
+int mhd_respond(struct MHD_Connection *connection,
+ unsigned code,
+ const char *message);
+
+int mhd_respond_oom(struct MHD_Connection *connection);
+
+int check_permissions(struct MHD_Connection *connection, int *code, char **hostname);
+
+/* Set gnutls internal logging function to a callback which uses our
+ * own logging framework.
+ *
+ * gnutls categories are additionally filtered by our internal log
+ * level, so it should be set fairly high to capture all potentially
+ * interesting events without overwhelming detail.
+ */
+int setup_gnutls_logger(char **categories);
+
+DEFINE_TRIVIAL_CLEANUP_FUNC(struct MHD_Daemon*, MHD_stop_daemon);
+DEFINE_TRIVIAL_CLEANUP_FUNC(struct MHD_Response*, MHD_destroy_response);