summaryrefslogtreecommitdiffstats
path: root/source3/smbd/notifyd
diff options
context:
space:
mode:
Diffstat (limited to 'source3/smbd/notifyd')
-rw-r--r--source3/smbd/notifyd/fcn_wait.c270
-rw-r--r--source3/smbd/notifyd/fcn_wait.h38
-rw-r--r--source3/smbd/notifyd/notifyd.c1428
-rw-r--r--source3/smbd/notifyd/notifyd.h145
-rw-r--r--source3/smbd/notifyd/notifyd_db.c165
-rw-r--r--source3/smbd/notifyd/notifyd_db.h27
-rw-r--r--source3/smbd/notifyd/notifyd_entry.c42
-rw-r--r--source3/smbd/notifyd/notifyd_private.h49
-rw-r--r--source3/smbd/notifyd/notifydd.c89
-rw-r--r--source3/smbd/notifyd/test_notifyd.c347
-rw-r--r--source3/smbd/notifyd/tests.c118
-rw-r--r--source3/smbd/notifyd/wscript_build44
12 files changed, 2762 insertions, 0 deletions
diff --git a/source3/smbd/notifyd/fcn_wait.c b/source3/smbd/notifyd/fcn_wait.c
new file mode 100644
index 0000000..e32240d
--- /dev/null
+++ b/source3/smbd/notifyd/fcn_wait.c
@@ -0,0 +1,270 @@
+/*
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "replace.h"
+#include "fcn_wait.h"
+#include "notifyd.h"
+#include "lib/util/tevent_ntstatus.h"
+
+struct fcn_event {
+ struct fcn_event *prev, *next;
+ struct notify_event_msg msg;
+};
+
+struct fcn_wait_state {
+ struct tevent_context *ev;
+ struct messaging_context *msg_ctx;
+ struct server_id notifyd;
+ const char *path;
+
+ struct tevent_req *recv_subreq;
+
+ struct fcn_event *events;
+};
+
+static bool fcn_wait_cancel(struct tevent_req *req);
+static void fcn_wait_cleanup(
+ struct tevent_req *req, enum tevent_req_state req_state);
+static bool fcn_wait_filter(struct messaging_rec *rec, void *private_data);
+static void fcn_wait_done(struct tevent_req *subreq);
+
+struct tevent_req *fcn_wait_send(
+ TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ struct messaging_context *msg_ctx,
+ struct server_id notifyd,
+ const char *path,
+ uint32_t filter,
+ uint32_t subdir_filter)
+{
+ struct tevent_req *req = NULL;
+ struct fcn_wait_state *state = NULL;
+ struct notify_rec_change_msg msg = {
+ .instance.filter = filter,
+ .instance.subdir_filter = subdir_filter,
+ };
+ struct iovec iov[2];
+ NTSTATUS status;
+
+ req = tevent_req_create(mem_ctx, &state, struct fcn_wait_state);
+ if (req == NULL) {
+ return NULL;
+ }
+ state->ev = ev;
+ state->msg_ctx = msg_ctx;
+ state->notifyd = notifyd;
+ state->path = path;
+
+ state->recv_subreq = messaging_filtered_read_send(
+ state, ev, msg_ctx, fcn_wait_filter, req);
+ if (tevent_req_nomem(state->recv_subreq, req)) {
+ return tevent_req_post(req, ev);
+ }
+ tevent_req_set_callback(state->recv_subreq, fcn_wait_done, req);
+ tevent_req_set_cleanup_fn(req, fcn_wait_cleanup);
+
+ clock_gettime_mono(&msg.instance.creation_time);
+ msg.instance.private_data = state;
+
+ iov[0].iov_base = &msg;
+ iov[0].iov_len = offsetof(struct notify_rec_change_msg, path);
+ iov[1].iov_base = discard_const_p(char, path);
+ iov[1].iov_len = strlen(path)+1;
+
+ status = messaging_send_iov(
+ msg_ctx, /* msg_ctx */
+ notifyd, /* dst */
+ MSG_SMB_NOTIFY_REC_CHANGE, /* mst_type */
+ iov, /* iov */
+ ARRAY_SIZE(iov), /* iovlen */
+ NULL, /* fds */
+ 0); /* num_fds */
+ if (tevent_req_nterror(req, status)) {
+ DBG_DEBUG("messaging_send_iov failed: %s\n",
+ nt_errstr(status));
+ return tevent_req_post(req, ev);
+ }
+ tevent_req_set_cancel_fn(req, fcn_wait_cancel);
+
+ return req;
+}
+
+static bool fcn_wait_cancel(struct tevent_req *req)
+{
+ struct fcn_wait_state *state = tevent_req_data(
+ req, struct fcn_wait_state);
+ struct notify_rec_change_msg msg = {
+ .instance.filter = 0, /* filter==0 is a delete msg */
+ .instance.subdir_filter = 0,
+ };
+ struct iovec iov[2];
+ NTSTATUS status;
+
+ clock_gettime_mono(&msg.instance.creation_time);
+ msg.instance.private_data = state;
+
+ iov[0].iov_base = &msg;
+ iov[0].iov_len = offsetof(struct notify_rec_change_msg, path);
+ iov[1].iov_base = discard_const_p(char, state->path);
+ iov[1].iov_len = strlen(state->path)+1;
+
+ status = messaging_send_iov(
+ state->msg_ctx, /* msg_ctx */
+ state->notifyd, /* dst */
+ MSG_SMB_NOTIFY_REC_CHANGE, /* mst_type */
+ iov, /* iov */
+ ARRAY_SIZE(iov), /* iovlen */
+ NULL, /* fds */
+ 0); /* num_fds */
+ if (!NT_STATUS_IS_OK(status)) {
+ DBG_DEBUG("messaging_send_iov failed: %s\n",
+ nt_errstr(status));
+ return false;
+ }
+
+ fcn_wait_cleanup(req, 0); /* fcn_wait_cleanup ignores req_state */
+ tevent_req_defer_callback(req, state->ev);
+ tevent_req_nterror(req, NT_STATUS_CANCELLED);
+
+ return true;
+}
+
+static void fcn_wait_cleanup(
+ struct tevent_req *req, enum tevent_req_state req_state)
+{
+ struct fcn_wait_state *state = tevent_req_data(
+ req, struct fcn_wait_state);
+ TALLOC_FREE(state->recv_subreq);
+}
+
+static bool fcn_wait_filter(struct messaging_rec *rec, void *private_data)
+{
+ struct tevent_req *req = talloc_get_type_abort(
+ private_data, struct tevent_req);
+ struct fcn_wait_state *state = tevent_req_data(
+ req, struct fcn_wait_state);
+ struct notify_event_msg msg = { .action = 0 };
+ struct fcn_event *evt = NULL;
+
+ if (rec->msg_type != MSG_PVFS_NOTIFY) {
+ DBG_DEBUG("Ignoring msg %"PRIu32"\n", rec->msg_type);
+ return false;
+ }
+
+ /*
+ * We need at least the trailing '\0' for the path
+ */
+ if (rec->buf.length < (offsetof(struct notify_event_msg, path) + 1)) {
+ DBG_DEBUG("Ignoring short (%zu) msg\n", rec->buf.length);
+ return false;
+ }
+ if (rec->buf.data[rec->buf.length-1] != '\0') {
+ DBG_DEBUG("Expected 0-terminated path\n");
+ return false;
+ }
+
+ memcpy(&msg, rec->buf.data, sizeof(msg));
+
+ if (msg.private_data != state) {
+ DBG_DEBUG("Got private_data=%p, expected %p\n",
+ msg.private_data,
+ state);
+ return false;
+ }
+
+ evt = talloc_memdup(state, rec->buf.data, rec->buf.length);
+ if (evt == NULL) {
+ DBG_DEBUG("talloc_memdup failed\n");
+ return false;
+ }
+ talloc_set_name_const(evt, "struct fcn_event");
+
+ /*
+ * TODO: Sort by timestamp
+ */
+
+ DLIST_ADD_END(state->events, evt);
+
+ tevent_req_defer_callback(req, state->ev);
+ tevent_req_notify_callback(req);
+
+ return false;
+}
+
+static void fcn_wait_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ int ret;
+
+ ret = messaging_filtered_read_recv(subreq, NULL, NULL);
+ TALLOC_FREE(subreq);
+ if (ret != 0) {
+ DBG_DEBUG("messaging_filtered_read failed: %s\n",
+ strerror(ret));
+ tevent_req_nterror(req, map_nt_error_from_unix(ret));
+ return;
+ }
+
+ /*
+ * We should never have gotten here, all work is done from the
+ * filter function.
+ */
+ tevent_req_nterror(req, NT_STATUS_INTERNAL_ERROR);
+}
+
+NTSTATUS fcn_wait_recv(
+ struct tevent_req *req,
+ TALLOC_CTX *mem_ctx,
+ struct timespec *when,
+ uint32_t *action,
+ char **path)
+{
+ struct fcn_wait_state *state = tevent_req_data(
+ req, struct fcn_wait_state);
+ struct fcn_event *evt = NULL;
+ NTSTATUS status;
+
+ if (!tevent_req_is_in_progress(req) &&
+ tevent_req_is_nterror(req, &status)) {
+ return status;
+ }
+ evt = state->events;
+ if (evt == NULL) {
+ return NT_STATUS_RETRY;
+ }
+
+ if (path != NULL) {
+ *path = talloc_strdup(mem_ctx, evt->msg.path);
+ if ((*path) == NULL) {
+ return NT_STATUS_NO_MEMORY;
+ }
+ }
+ if (when != NULL) {
+ *when = evt->msg.when;
+ }
+ if (action != NULL) {
+ *action = evt->msg.action;
+ }
+
+ DLIST_REMOVE(state->events, evt);
+
+ if (state->events != NULL) {
+ tevent_req_defer_callback(req, state->ev);
+ tevent_req_notify_callback(req);
+ }
+
+ return NT_STATUS_OK;
+}
diff --git a/source3/smbd/notifyd/fcn_wait.h b/source3/smbd/notifyd/fcn_wait.h
new file mode 100644
index 0000000..daadee4
--- /dev/null
+++ b/source3/smbd/notifyd/fcn_wait.h
@@ -0,0 +1,38 @@
+/*
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef __NOTIFYD_FCN_WAIT_H__
+#define __NOTIFYD_FCN_WAIT_H__
+
+#include "replace.h"
+#include "messages.h"
+#include "librpc/gen_ndr/server_id.h"
+
+struct tevent_req *fcn_wait_send(
+ TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ struct messaging_context *msg_ctx,
+ struct server_id notifyd,
+ const char *path,
+ uint32_t filter,
+ uint32_t subdir_filter);
+NTSTATUS fcn_wait_recv(
+ struct tevent_req *req,
+ TALLOC_CTX *mem_ctx,
+ struct timespec *when,
+ uint32_t *action,
+ char **path);
+
+#endif
diff --git a/source3/smbd/notifyd/notifyd.c b/source3/smbd/notifyd/notifyd.c
new file mode 100644
index 0000000..475c8e9
--- /dev/null
+++ b/source3/smbd/notifyd/notifyd.c
@@ -0,0 +1,1428 @@
+/*
+ * Unix SMB/CIFS implementation.
+ *
+ * Copyright (C) Volker Lendecke 2014
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "replace.h"
+#include <tevent.h>
+#include "notifyd_private.h"
+#include "lib/util/server_id.h"
+#include "lib/util/data_blob.h"
+#include "librpc/gen_ndr/notify.h"
+#include "librpc/gen_ndr/messaging.h"
+#include "librpc/gen_ndr/server_id.h"
+#include "lib/dbwrap/dbwrap.h"
+#include "lib/dbwrap/dbwrap_rbt.h"
+#include "messages.h"
+#include "tdb.h"
+#include "util_tdb.h"
+#include "notifyd.h"
+#include "lib/util/server_id_db.h"
+#include "lib/util/tevent_unix.h"
+#include "lib/util/tevent_ntstatus.h"
+#include "ctdbd_conn.h"
+#include "ctdb_srvids.h"
+#include "server_id_db_util.h"
+#include "lib/util/iov_buf.h"
+#include "messages_util.h"
+
+#ifdef CLUSTER_SUPPORT
+#include "ctdb_protocol.h"
+#endif
+
+struct notifyd_peer;
+
+/*
+ * All of notifyd's state
+ */
+
+struct notifyd_state {
+ struct tevent_context *ev;
+ struct messaging_context *msg_ctx;
+ struct ctdbd_connection *ctdbd_conn;
+
+ /*
+ * Database of everything clients show interest in. Indexed by
+ * absolute path. The database keys are not 0-terminated
+ * to allow the criticial operation, notifyd_trigger, to walk
+ * the structure from the top without adding intermediate 0s.
+ * The database records contain an array of
+ *
+ * struct notifyd_instance
+ *
+ * to be maintained and parsed by notifyd_parse_entry()
+ */
+ struct db_context *entries;
+
+ /*
+ * In the cluster case, this is the place where we store a log
+ * of all MSG_SMB_NOTIFY_REC_CHANGE messages. We just 1:1
+ * forward them to our peer notifyd's in the cluster once a
+ * second or when the log grows too large.
+ */
+
+ struct messaging_reclog *log;
+
+ /*
+ * Array of companion notifyd's in a cluster. Every notifyd
+ * broadcasts its messaging_reclog to every other notifyd in
+ * the cluster. This is done by making ctdb send a message to
+ * srvid CTDB_SRVID_SAMBA_NOTIFY_PROXY with destination node
+ * number CTDB_BROADCAST_CONNECTED. Everybody in the cluster who
+ * had called register_with_ctdbd this srvid will receive the
+ * broadcasts.
+ *
+ * Database replication happens via these broadcasts. Also,
+ * they serve as liveness indication. If a notifyd receives a
+ * broadcast from an unknown peer, it will create one for this
+ * srvid. Also when we don't hear anything from a peer for a
+ * while, we will discard it.
+ */
+
+ struct notifyd_peer **peers;
+ size_t num_peers;
+
+ sys_notify_watch_fn sys_notify_watch;
+ struct sys_notify_context *sys_notify_ctx;
+};
+
+struct notifyd_peer {
+ struct notifyd_state *state;
+ struct server_id pid;
+ uint64_t rec_index;
+ struct db_context *db;
+ time_t last_broadcast;
+};
+
+static void notifyd_rec_change(struct messaging_context *msg_ctx,
+ void *private_data, uint32_t msg_type,
+ struct server_id src, DATA_BLOB *data);
+static void notifyd_trigger(struct messaging_context *msg_ctx,
+ void *private_data, uint32_t msg_type,
+ struct server_id src, DATA_BLOB *data);
+static void notifyd_get_db(struct messaging_context *msg_ctx,
+ void *private_data, uint32_t msg_type,
+ struct server_id src, DATA_BLOB *data);
+
+#ifdef CLUSTER_SUPPORT
+static void notifyd_got_db(struct messaging_context *msg_ctx,
+ void *private_data, uint32_t msg_type,
+ struct server_id src, DATA_BLOB *data);
+static void notifyd_broadcast_reclog(struct ctdbd_connection *ctdbd_conn,
+ struct server_id src,
+ struct messaging_reclog *log);
+#endif
+static void notifyd_sys_callback(struct sys_notify_context *ctx,
+ void *private_data, struct notify_event *ev,
+ uint32_t filter);
+
+#ifdef CLUSTER_SUPPORT
+static struct tevent_req *notifyd_broadcast_reclog_send(
+ TALLOC_CTX *mem_ctx, struct tevent_context *ev,
+ struct ctdbd_connection *ctdbd_conn, struct server_id src,
+ struct messaging_reclog *log);
+static int notifyd_broadcast_reclog_recv(struct tevent_req *req);
+
+static struct tevent_req *notifyd_clean_peers_send(
+ TALLOC_CTX *mem_ctx, struct tevent_context *ev,
+ struct notifyd_state *notifyd);
+static int notifyd_clean_peers_recv(struct tevent_req *req);
+#endif
+
+static int sys_notify_watch_dummy(
+ TALLOC_CTX *mem_ctx,
+ struct sys_notify_context *ctx,
+ const char *path,
+ uint32_t *filter,
+ uint32_t *subdir_filter,
+ void (*callback)(struct sys_notify_context *ctx,
+ void *private_data,
+ struct notify_event *ev,
+ uint32_t filter),
+ void *private_data,
+ void *handle_p)
+{
+ void **handle = handle_p;
+ *handle = NULL;
+ return 0;
+}
+
+#ifdef CLUSTER_SUPPORT
+static void notifyd_broadcast_reclog_finished(struct tevent_req *subreq);
+static void notifyd_clean_peers_finished(struct tevent_req *subreq);
+static int notifyd_snoop_broadcast(struct tevent_context *ev,
+ uint32_t src_vnn, uint32_t dst_vnn,
+ uint64_t dst_srvid,
+ const uint8_t *msg, size_t msglen,
+ void *private_data);
+#endif
+
+struct tevent_req *notifyd_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
+ struct messaging_context *msg_ctx,
+ struct ctdbd_connection *ctdbd_conn,
+ sys_notify_watch_fn sys_notify_watch,
+ struct sys_notify_context *sys_notify_ctx)
+{
+ struct tevent_req *req;
+#ifdef CLUSTER_SUPPORT
+ struct tevent_req *subreq;
+#endif
+ struct notifyd_state *state;
+ struct server_id_db *names_db;
+ NTSTATUS status;
+ int ret;
+
+ req = tevent_req_create(mem_ctx, &state, struct notifyd_state);
+ if (req == NULL) {
+ return NULL;
+ }
+ state->ev = ev;
+ state->msg_ctx = msg_ctx;
+ state->ctdbd_conn = ctdbd_conn;
+
+ if (sys_notify_watch == NULL) {
+ sys_notify_watch = sys_notify_watch_dummy;
+ }
+
+ state->sys_notify_watch = sys_notify_watch;
+ state->sys_notify_ctx = sys_notify_ctx;
+
+ state->entries = db_open_rbt(state);
+ if (tevent_req_nomem(state->entries, req)) {
+ return tevent_req_post(req, ev);
+ }
+
+ status = messaging_register(msg_ctx, state, MSG_SMB_NOTIFY_REC_CHANGE,
+ notifyd_rec_change);
+ if (tevent_req_nterror(req, status)) {
+ return tevent_req_post(req, ev);
+ }
+
+ status = messaging_register(msg_ctx, state, MSG_SMB_NOTIFY_TRIGGER,
+ notifyd_trigger);
+ if (tevent_req_nterror(req, status)) {
+ goto deregister_rec_change;
+ }
+
+ status = messaging_register(msg_ctx, state, MSG_SMB_NOTIFY_GET_DB,
+ notifyd_get_db);
+ if (tevent_req_nterror(req, status)) {
+ goto deregister_trigger;
+ }
+
+ names_db = messaging_names_db(msg_ctx);
+
+ ret = server_id_db_set_exclusive(names_db, "notify-daemon");
+ if (ret != 0) {
+ DBG_DEBUG("server_id_db_add failed: %s\n",
+ strerror(ret));
+ tevent_req_error(req, ret);
+ goto deregister_get_db;
+ }
+
+ if (ctdbd_conn == NULL) {
+ /*
+ * No cluster around, skip the database replication
+ * engine
+ */
+ return req;
+ }
+
+#ifdef CLUSTER_SUPPORT
+ status = messaging_register(msg_ctx, state, MSG_SMB_NOTIFY_DB,
+ notifyd_got_db);
+ if (tevent_req_nterror(req, status)) {
+ goto deregister_get_db;
+ }
+
+ state->log = talloc_zero(state, struct messaging_reclog);
+ if (tevent_req_nomem(state->log, req)) {
+ goto deregister_db;
+ }
+
+ subreq = notifyd_broadcast_reclog_send(
+ state->log, ev, ctdbd_conn,
+ messaging_server_id(msg_ctx),
+ state->log);
+ if (tevent_req_nomem(subreq, req)) {
+ goto deregister_db;
+ }
+ tevent_req_set_callback(subreq,
+ notifyd_broadcast_reclog_finished,
+ req);
+
+ subreq = notifyd_clean_peers_send(state, ev, state);
+ if (tevent_req_nomem(subreq, req)) {
+ goto deregister_db;
+ }
+ tevent_req_set_callback(subreq, notifyd_clean_peers_finished,
+ req);
+
+ ret = register_with_ctdbd(ctdbd_conn,
+ CTDB_SRVID_SAMBA_NOTIFY_PROXY,
+ notifyd_snoop_broadcast, state);
+ if (ret != 0) {
+ tevent_req_error(req, ret);
+ goto deregister_db;
+ }
+#endif
+
+ return req;
+
+#ifdef CLUSTER_SUPPORT
+deregister_db:
+ messaging_deregister(msg_ctx, MSG_SMB_NOTIFY_DB, state);
+#endif
+deregister_get_db:
+ messaging_deregister(msg_ctx, MSG_SMB_NOTIFY_GET_DB, state);
+deregister_trigger:
+ messaging_deregister(msg_ctx, MSG_SMB_NOTIFY_TRIGGER, state);
+deregister_rec_change:
+ messaging_deregister(msg_ctx, MSG_SMB_NOTIFY_REC_CHANGE, state);
+ return tevent_req_post(req, ev);
+}
+
+#ifdef CLUSTER_SUPPORT
+
+static void notifyd_broadcast_reclog_finished(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ int ret;
+
+ ret = notifyd_broadcast_reclog_recv(subreq);
+ TALLOC_FREE(subreq);
+ tevent_req_error(req, ret);
+}
+
+static void notifyd_clean_peers_finished(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ int ret;
+
+ ret = notifyd_clean_peers_recv(subreq);
+ TALLOC_FREE(subreq);
+ tevent_req_error(req, ret);
+}
+
+#endif
+
+int notifyd_recv(struct tevent_req *req)
+{
+ return tevent_req_simple_recv_unix(req);
+}
+
+static bool notifyd_apply_rec_change(
+ const struct server_id *client,
+ const char *path, size_t pathlen,
+ const struct notify_instance *chg,
+ struct db_context *entries,
+ sys_notify_watch_fn sys_notify_watch,
+ struct sys_notify_context *sys_notify_ctx,
+ struct messaging_context *msg_ctx)
+{
+ struct db_record *rec = NULL;
+ struct notifyd_instance *instances = NULL;
+ size_t num_instances;
+ size_t i;
+ struct notifyd_instance *instance = NULL;
+ TDB_DATA value;
+ NTSTATUS status;
+ bool ok = false;
+
+ if (pathlen == 0) {
+ DBG_WARNING("pathlen==0\n");
+ return false;
+ }
+ if (path[pathlen-1] != '\0') {
+ DBG_WARNING("path not 0-terminated\n");
+ return false;
+ }
+
+ DBG_DEBUG("path=%s, filter=%"PRIu32", subdir_filter=%"PRIu32", "
+ "private_data=%p\n",
+ path,
+ chg->filter,
+ chg->subdir_filter,
+ chg->private_data);
+
+ rec = dbwrap_fetch_locked(
+ entries, entries,
+ make_tdb_data((const uint8_t *)path, pathlen-1));
+
+ if (rec == NULL) {
+ DBG_WARNING("dbwrap_fetch_locked failed\n");
+ goto fail;
+ }
+
+ num_instances = 0;
+ value = dbwrap_record_get_value(rec);
+
+ if (value.dsize != 0) {
+ if (!notifyd_parse_entry(value.dptr, value.dsize, NULL,
+ &num_instances)) {
+ goto fail;
+ }
+ }
+
+ /*
+ * Overallocate by one instance to avoid a realloc when adding
+ */
+ instances = talloc_array(rec, struct notifyd_instance,
+ num_instances + 1);
+ if (instances == NULL) {
+ DBG_WARNING("talloc failed\n");
+ goto fail;
+ }
+
+ if (value.dsize != 0) {
+ memcpy(instances, value.dptr, value.dsize);
+ }
+
+ for (i=0; i<num_instances; i++) {
+ instance = &instances[i];
+
+ if (server_id_equal(&instance->client, client) &&
+ (instance->instance.private_data == chg->private_data)) {
+ break;
+ }
+ }
+
+ if (i < num_instances) {
+ instance->instance = *chg;
+ } else {
+ /*
+ * We've overallocated for one instance
+ */
+ instance = &instances[num_instances];
+
+ *instance = (struct notifyd_instance) {
+ .client = *client,
+ .instance = *chg,
+ .internal_filter = chg->filter,
+ .internal_subdir_filter = chg->subdir_filter
+ };
+
+ num_instances += 1;
+ }
+
+ if ((instance->instance.filter != 0) ||
+ (instance->instance.subdir_filter != 0)) {
+ int ret;
+
+ TALLOC_FREE(instance->sys_watch);
+
+ ret = sys_notify_watch(entries, sys_notify_ctx, path,
+ &instance->internal_filter,
+ &instance->internal_subdir_filter,
+ notifyd_sys_callback, msg_ctx,
+ &instance->sys_watch);
+ if (ret != 0) {
+ DBG_WARNING("sys_notify_watch for [%s] returned %s\n",
+ path, strerror(errno));
+ }
+ }
+
+ if ((instance->instance.filter == 0) &&
+ (instance->instance.subdir_filter == 0)) {
+ /* This is a delete request */
+ TALLOC_FREE(instance->sys_watch);
+ *instance = instances[num_instances-1];
+ num_instances -= 1;
+ }
+
+ DBG_DEBUG("%s has %zu instances\n", path, num_instances);
+
+ if (num_instances == 0) {
+ status = dbwrap_record_delete(rec);
+ if (!NT_STATUS_IS_OK(status)) {
+ DBG_WARNING("dbwrap_record_delete returned %s\n",
+ nt_errstr(status));
+ goto fail;
+ }
+ } else {
+ value = make_tdb_data(
+ (uint8_t *)instances,
+ sizeof(struct notifyd_instance) * num_instances);
+
+ status = dbwrap_record_store(rec, value, 0);
+ if (!NT_STATUS_IS_OK(status)) {
+ DBG_WARNING("dbwrap_record_store returned %s\n",
+ nt_errstr(status));
+ goto fail;
+ }
+ }
+
+ ok = true;
+fail:
+ TALLOC_FREE(rec);
+ return ok;
+}
+
+static void notifyd_sys_callback(struct sys_notify_context *ctx,
+ void *private_data, struct notify_event *ev,
+ uint32_t filter)
+{
+ struct messaging_context *msg_ctx = talloc_get_type_abort(
+ private_data, struct messaging_context);
+ struct notify_trigger_msg msg;
+ struct iovec iov[4];
+ char slash = '/';
+
+ msg = (struct notify_trigger_msg) {
+ .when = timespec_current(),
+ .action = ev->action,
+ .filter = filter,
+ };
+
+ iov[0].iov_base = &msg;
+ iov[0].iov_len = offsetof(struct notify_trigger_msg, path);
+ iov[1].iov_base = discard_const_p(char, ev->dir);
+ iov[1].iov_len = strlen(ev->dir);
+ iov[2].iov_base = &slash;
+ iov[2].iov_len = 1;
+ iov[3].iov_base = discard_const_p(char, ev->path);
+ iov[3].iov_len = strlen(ev->path)+1;
+
+ messaging_send_iov(
+ msg_ctx, messaging_server_id(msg_ctx),
+ MSG_SMB_NOTIFY_TRIGGER, iov, ARRAY_SIZE(iov), NULL, 0);
+}
+
+static bool notifyd_parse_rec_change(uint8_t *buf, size_t bufsize,
+ struct notify_rec_change_msg **pmsg,
+ size_t *pathlen)
+{
+ struct notify_rec_change_msg *msg;
+
+ if (bufsize < offsetof(struct notify_rec_change_msg, path) + 1) {
+ DBG_WARNING("message too short, ignoring: %zu\n", bufsize);
+ return false;
+ }
+
+ *pmsg = msg = (struct notify_rec_change_msg *)buf;
+ *pathlen = bufsize - offsetof(struct notify_rec_change_msg, path);
+
+ DBG_DEBUG("Got rec_change_msg filter=%"PRIu32", "
+ "subdir_filter=%"PRIu32", private_data=%p, path=%.*s\n",
+ msg->instance.filter,
+ msg->instance.subdir_filter,
+ msg->instance.private_data,
+ (int)(*pathlen),
+ msg->path);
+
+ return true;
+}
+
+static void notifyd_rec_change(struct messaging_context *msg_ctx,
+ void *private_data, uint32_t msg_type,
+ struct server_id src, DATA_BLOB *data)
+{
+ struct notifyd_state *state = talloc_get_type_abort(
+ private_data, struct notifyd_state);
+ struct server_id_buf idbuf;
+ struct notify_rec_change_msg *msg;
+ size_t pathlen;
+ bool ok;
+ struct notify_instance instance;
+
+ DBG_DEBUG("Got %zu bytes from %s\n", data->length,
+ server_id_str_buf(src, &idbuf));
+
+ ok = notifyd_parse_rec_change(data->data, data->length,
+ &msg, &pathlen);
+ if (!ok) {
+ return;
+ }
+
+ memcpy(&instance, &msg->instance, sizeof(instance)); /* avoid SIGBUS */
+
+ ok = notifyd_apply_rec_change(
+ &src, msg->path, pathlen, &instance,
+ state->entries, state->sys_notify_watch, state->sys_notify_ctx,
+ state->msg_ctx);
+ if (!ok) {
+ DBG_DEBUG("notifyd_apply_rec_change failed, ignoring\n");
+ return;
+ }
+
+ if ((state->log == NULL) || (state->ctdbd_conn == NULL)) {
+ return;
+ }
+
+#ifdef CLUSTER_SUPPORT
+ {
+
+ struct messaging_rec **tmp;
+ struct messaging_reclog *log;
+ struct iovec iov = { .iov_base = data->data, .iov_len = data->length };
+
+ log = state->log;
+
+ tmp = talloc_realloc(log, log->recs, struct messaging_rec *,
+ log->num_recs+1);
+ if (tmp == NULL) {
+ DBG_WARNING("talloc_realloc failed, ignoring\n");
+ return;
+ }
+ log->recs = tmp;
+
+ log->recs[log->num_recs] = messaging_rec_create(
+ log->recs, src, messaging_server_id(msg_ctx),
+ msg_type, &iov, 1, NULL, 0);
+
+ if (log->recs[log->num_recs] == NULL) {
+ DBG_WARNING("messaging_rec_create failed, ignoring\n");
+ return;
+ }
+
+ log->num_recs += 1;
+
+ if (log->num_recs >= 100) {
+ /*
+ * Don't let the log grow too large
+ */
+ notifyd_broadcast_reclog(state->ctdbd_conn,
+ messaging_server_id(msg_ctx), log);
+ }
+
+ }
+#endif
+}
+
+struct notifyd_trigger_state {
+ struct messaging_context *msg_ctx;
+ struct notify_trigger_msg *msg;
+ bool recursive;
+ bool covered_by_sys_notify;
+};
+
+static void notifyd_trigger_parser(TDB_DATA key, TDB_DATA data,
+ void *private_data);
+
+static void notifyd_trigger(struct messaging_context *msg_ctx,
+ void *private_data, uint32_t msg_type,
+ struct server_id src, DATA_BLOB *data)
+{
+ struct notifyd_state *state = talloc_get_type_abort(
+ private_data, struct notifyd_state);
+ struct server_id my_id = messaging_server_id(msg_ctx);
+ struct notifyd_trigger_state tstate;
+ const char *path;
+ const char *p, *next_p;
+
+ if (data->length < offsetof(struct notify_trigger_msg, path) + 1) {
+ DBG_WARNING("message too short, ignoring: %zu\n",
+ data->length);
+ return;
+ }
+ if (data->data[data->length-1] != 0) {
+ DBG_WARNING("path not 0-terminated, ignoring\n");;
+ return;
+ }
+
+ tstate.msg_ctx = msg_ctx;
+
+ tstate.covered_by_sys_notify = (src.vnn == my_id.vnn);
+ tstate.covered_by_sys_notify &= !server_id_equal(&src, &my_id);
+
+ tstate.msg = (struct notify_trigger_msg *)data->data;
+ path = tstate.msg->path;
+
+ DBG_DEBUG("Got trigger_msg action=%"PRIu32", filter=%"PRIu32", "
+ "path=%s\n",
+ tstate.msg->action,
+ tstate.msg->filter,
+ path);
+
+ if (path[0] != '/') {
+ DBG_WARNING("path %s does not start with /, ignoring\n",
+ path);
+ return;
+ }
+
+ for (p = strchr(path+1, '/'); p != NULL; p = next_p) {
+ ptrdiff_t path_len = p - path;
+ TDB_DATA key;
+ uint32_t i;
+
+ next_p = strchr(p+1, '/');
+ tstate.recursive = (next_p != NULL);
+
+ DBG_DEBUG("Trying path %.*s\n", (int)path_len, path);
+
+ key = (TDB_DATA) { .dptr = discard_const_p(uint8_t, path),
+ .dsize = path_len };
+
+ dbwrap_parse_record(state->entries, key,
+ notifyd_trigger_parser, &tstate);
+
+ if (state->peers == NULL) {
+ continue;
+ }
+
+ if (src.vnn != my_id.vnn) {
+ continue;
+ }
+
+ for (i=0; i<state->num_peers; i++) {
+ if (state->peers[i]->db == NULL) {
+ /*
+ * Inactive peer, did not get a db yet
+ */
+ continue;
+ }
+ dbwrap_parse_record(state->peers[i]->db, key,
+ notifyd_trigger_parser, &tstate);
+ }
+ }
+}
+
+static void notifyd_send_delete(struct messaging_context *msg_ctx,
+ TDB_DATA key,
+ struct notifyd_instance *instance);
+
+static void notifyd_trigger_parser(TDB_DATA key, TDB_DATA data,
+ void *private_data)
+
+{
+ struct notifyd_trigger_state *tstate = private_data;
+ struct notify_event_msg msg = { .action = tstate->msg->action,
+ .when = tstate->msg->when };
+ struct iovec iov[2];
+ size_t path_len = key.dsize;
+ struct notifyd_instance *instances = NULL;
+ size_t num_instances = 0;
+ size_t i;
+
+ if (!notifyd_parse_entry(data.dptr, data.dsize, &instances,
+ &num_instances)) {
+ DBG_DEBUG("Could not parse notifyd_entry\n");
+ return;
+ }
+
+ DBG_DEBUG("Found %zu instances for %.*s\n",
+ num_instances,
+ (int)key.dsize,
+ (char *)key.dptr);
+
+ iov[0].iov_base = &msg;
+ iov[0].iov_len = offsetof(struct notify_event_msg, path);
+ iov[1].iov_base = tstate->msg->path + path_len + 1;
+ iov[1].iov_len = strlen((char *)(iov[1].iov_base)) + 1;
+
+ for (i=0; i<num_instances; i++) {
+ struct notifyd_instance *instance = &instances[i];
+ struct server_id_buf idbuf;
+ uint32_t i_filter;
+ NTSTATUS status;
+
+ if (tstate->covered_by_sys_notify) {
+ if (tstate->recursive) {
+ i_filter = instance->internal_subdir_filter;
+ } else {
+ i_filter = instance->internal_filter;
+ }
+ } else {
+ if (tstate->recursive) {
+ i_filter = instance->instance.subdir_filter;
+ } else {
+ i_filter = instance->instance.filter;
+ }
+ }
+
+ if ((i_filter & tstate->msg->filter) == 0) {
+ continue;
+ }
+
+ msg.private_data = instance->instance.private_data;
+
+ status = messaging_send_iov(
+ tstate->msg_ctx, instance->client,
+ MSG_PVFS_NOTIFY, iov, ARRAY_SIZE(iov), NULL, 0);
+
+ DBG_DEBUG("messaging_send_iov to %s returned %s\n",
+ server_id_str_buf(instance->client, &idbuf),
+ nt_errstr(status));
+
+ if (NT_STATUS_EQUAL(status, NT_STATUS_OBJECT_NAME_NOT_FOUND) &&
+ procid_is_local(&instance->client)) {
+ /*
+ * That process has died
+ */
+ notifyd_send_delete(tstate->msg_ctx, key, instance);
+ continue;
+ }
+
+ if (!NT_STATUS_IS_OK(status)) {
+ DBG_WARNING("messaging_send_iov returned %s\n",
+ nt_errstr(status));
+ }
+ }
+}
+
+/*
+ * Send a delete request to ourselves to properly discard a notify
+ * record for an smbd that has died.
+ */
+
+static void notifyd_send_delete(struct messaging_context *msg_ctx,
+ TDB_DATA key,
+ struct notifyd_instance *instance)
+{
+ struct notify_rec_change_msg msg = {
+ .instance.private_data = instance->instance.private_data
+ };
+ uint8_t nul = 0;
+ struct iovec iov[3];
+ int ret;
+
+ /*
+ * Send a rec_change to ourselves to delete a dead entry
+ */
+
+ iov[0] = (struct iovec) {
+ .iov_base = &msg,
+ .iov_len = offsetof(struct notify_rec_change_msg, path) };
+ iov[1] = (struct iovec) { .iov_base = key.dptr, .iov_len = key.dsize };
+ iov[2] = (struct iovec) { .iov_base = &nul, .iov_len = sizeof(nul) };
+
+ ret = messaging_send_iov_from(
+ msg_ctx, instance->client, messaging_server_id(msg_ctx),
+ MSG_SMB_NOTIFY_REC_CHANGE, iov, ARRAY_SIZE(iov), NULL, 0);
+
+ if (ret != 0) {
+ DBG_WARNING("messaging_send_iov_from returned %s\n",
+ strerror(ret));
+ }
+}
+
+static void notifyd_get_db(struct messaging_context *msg_ctx,
+ void *private_data, uint32_t msg_type,
+ struct server_id src, DATA_BLOB *data)
+{
+ struct notifyd_state *state = talloc_get_type_abort(
+ private_data, struct notifyd_state);
+ struct server_id_buf id1, id2;
+ NTSTATUS status;
+ uint64_t rec_index = UINT64_MAX;
+ uint8_t index_buf[sizeof(uint64_t)];
+ size_t dbsize;
+ uint8_t *buf;
+ struct iovec iov[2];
+
+ dbsize = dbwrap_marshall(state->entries, NULL, 0);
+
+ buf = talloc_array(talloc_tos(), uint8_t, dbsize);
+ if (buf == NULL) {
+ DBG_WARNING("talloc_array(%zu) failed\n", dbsize);
+ return;
+ }
+
+ dbsize = dbwrap_marshall(state->entries, buf, dbsize);
+
+ if (dbsize != talloc_get_size(buf)) {
+ DBG_DEBUG("dbsize changed: %zu->%zu\n",
+ talloc_get_size(buf),
+ dbsize);
+ TALLOC_FREE(buf);
+ return;
+ }
+
+ if (state->log != NULL) {
+ rec_index = state->log->rec_index;
+ }
+ SBVAL(index_buf, 0, rec_index);
+
+ iov[0] = (struct iovec) { .iov_base = index_buf,
+ .iov_len = sizeof(index_buf) };
+ iov[1] = (struct iovec) { .iov_base = buf,
+ .iov_len = dbsize };
+
+ DBG_DEBUG("Sending %zu bytes to %s->%s\n",
+ iov_buflen(iov, ARRAY_SIZE(iov)),
+ server_id_str_buf(messaging_server_id(msg_ctx), &id1),
+ server_id_str_buf(src, &id2));
+
+ status = messaging_send_iov(msg_ctx, src, MSG_SMB_NOTIFY_DB,
+ iov, ARRAY_SIZE(iov), NULL, 0);
+ TALLOC_FREE(buf);
+ if (!NT_STATUS_IS_OK(status)) {
+ DBG_WARNING("messaging_send_iov failed: %s\n",
+ nt_errstr(status));
+ }
+}
+
+#ifdef CLUSTER_SUPPORT
+
+static int notifyd_add_proxy_syswatches(struct db_record *rec,
+ void *private_data);
+
+static void notifyd_got_db(struct messaging_context *msg_ctx,
+ void *private_data, uint32_t msg_type,
+ struct server_id src, DATA_BLOB *data)
+{
+ struct notifyd_state *state = talloc_get_type_abort(
+ private_data, struct notifyd_state);
+ struct notifyd_peer *p = NULL;
+ struct server_id_buf idbuf;
+ NTSTATUS status;
+ int count;
+ size_t i;
+
+ for (i=0; i<state->num_peers; i++) {
+ if (server_id_equal(&src, &state->peers[i]->pid)) {
+ p = state->peers[i];
+ break;
+ }
+ }
+
+ if (p == NULL) {
+ DBG_DEBUG("Did not find peer for db from %s\n",
+ server_id_str_buf(src, &idbuf));
+ return;
+ }
+
+ if (data->length < 8) {
+ DBG_DEBUG("Got short db length %zu from %s\n", data->length,
+ server_id_str_buf(src, &idbuf));
+ TALLOC_FREE(p);
+ return;
+ }
+
+ p->rec_index = BVAL(data->data, 0);
+
+ p->db = db_open_rbt(p);
+ if (p->db == NULL) {
+ DBG_DEBUG("db_open_rbt failed\n");
+ TALLOC_FREE(p);
+ return;
+ }
+
+ status = dbwrap_unmarshall(p->db, data->data + 8,
+ data->length - 8);
+ if (!NT_STATUS_IS_OK(status)) {
+ DBG_DEBUG("dbwrap_unmarshall returned %s for db %s\n",
+ nt_errstr(status),
+ server_id_str_buf(src, &idbuf));
+ TALLOC_FREE(p);
+ return;
+ }
+
+ dbwrap_traverse_read(p->db, notifyd_add_proxy_syswatches, state,
+ &count);
+
+ DBG_DEBUG("Database from %s contained %d records\n",
+ server_id_str_buf(src, &idbuf),
+ count);
+}
+
+static void notifyd_broadcast_reclog(struct ctdbd_connection *ctdbd_conn,
+ struct server_id src,
+ struct messaging_reclog *log)
+{
+ enum ndr_err_code ndr_err;
+ uint8_t msghdr[MESSAGE_HDR_LENGTH];
+ DATA_BLOB blob;
+ struct iovec iov[2];
+ int ret;
+
+ if (log == NULL) {
+ return;
+ }
+
+ DBG_DEBUG("rec_index=%"PRIu64", num_recs=%"PRIu32"\n",
+ log->rec_index,
+ log->num_recs);
+
+ message_hdr_put(msghdr, MSG_SMB_NOTIFY_REC_CHANGES, src,
+ (struct server_id) {0 });
+ iov[0] = (struct iovec) { .iov_base = msghdr,
+ .iov_len = sizeof(msghdr) };
+
+ ndr_err = ndr_push_struct_blob(
+ &blob, log, log,
+ (ndr_push_flags_fn_t)ndr_push_messaging_reclog);
+ if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
+ DBG_WARNING("ndr_push_messaging_recs failed: %s\n",
+ ndr_errstr(ndr_err));
+ goto done;
+ }
+ iov[1] = (struct iovec) { .iov_base = blob.data,
+ .iov_len = blob.length };
+
+ ret = ctdbd_messaging_send_iov(
+ ctdbd_conn, CTDB_BROADCAST_CONNECTED,
+ CTDB_SRVID_SAMBA_NOTIFY_PROXY, iov, ARRAY_SIZE(iov));
+ TALLOC_FREE(blob.data);
+ if (ret != 0) {
+ DBG_WARNING("ctdbd_messaging_send failed: %s\n",
+ strerror(ret));
+ goto done;
+ }
+
+ log->rec_index += 1;
+
+done:
+ log->num_recs = 0;
+ TALLOC_FREE(log->recs);
+}
+
+struct notifyd_broadcast_reclog_state {
+ struct tevent_context *ev;
+ struct ctdbd_connection *ctdbd_conn;
+ struct server_id src;
+ struct messaging_reclog *log;
+};
+
+static void notifyd_broadcast_reclog_next(struct tevent_req *subreq);
+
+static struct tevent_req *notifyd_broadcast_reclog_send(
+ TALLOC_CTX *mem_ctx, struct tevent_context *ev,
+ struct ctdbd_connection *ctdbd_conn, struct server_id src,
+ struct messaging_reclog *log)
+{
+ struct tevent_req *req, *subreq;
+ struct notifyd_broadcast_reclog_state *state;
+
+ req = tevent_req_create(mem_ctx, &state,
+ struct notifyd_broadcast_reclog_state);
+ if (req == NULL) {
+ return NULL;
+ }
+ state->ev = ev;
+ state->ctdbd_conn = ctdbd_conn;
+ state->src = src;
+ state->log = log;
+
+ subreq = tevent_wakeup_send(state, state->ev,
+ timeval_current_ofs_msec(1000));
+ if (tevent_req_nomem(subreq, req)) {
+ return tevent_req_post(req, ev);
+ }
+ tevent_req_set_callback(subreq, notifyd_broadcast_reclog_next, req);
+ return req;
+}
+
+static void notifyd_broadcast_reclog_next(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct notifyd_broadcast_reclog_state *state = tevent_req_data(
+ req, struct notifyd_broadcast_reclog_state);
+ bool ok;
+
+ ok = tevent_wakeup_recv(subreq);
+ TALLOC_FREE(subreq);
+ if (!ok) {
+ tevent_req_oom(req);
+ return;
+ }
+
+ notifyd_broadcast_reclog(state->ctdbd_conn, state->src, state->log);
+
+ subreq = tevent_wakeup_send(state, state->ev,
+ timeval_current_ofs_msec(1000));
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(subreq, notifyd_broadcast_reclog_next, req);
+}
+
+static int notifyd_broadcast_reclog_recv(struct tevent_req *req)
+{
+ return tevent_req_simple_recv_unix(req);
+}
+
+struct notifyd_clean_peers_state {
+ struct tevent_context *ev;
+ struct notifyd_state *notifyd;
+};
+
+static void notifyd_clean_peers_next(struct tevent_req *subreq);
+
+static struct tevent_req *notifyd_clean_peers_send(
+ TALLOC_CTX *mem_ctx, struct tevent_context *ev,
+ struct notifyd_state *notifyd)
+{
+ struct tevent_req *req, *subreq;
+ struct notifyd_clean_peers_state *state;
+
+ req = tevent_req_create(mem_ctx, &state,
+ struct notifyd_clean_peers_state);
+ if (req == NULL) {
+ return NULL;
+ }
+ state->ev = ev;
+ state->notifyd = notifyd;
+
+ subreq = tevent_wakeup_send(state, state->ev,
+ timeval_current_ofs_msec(30000));
+ if (tevent_req_nomem(subreq, req)) {
+ return tevent_req_post(req, ev);
+ }
+ tevent_req_set_callback(subreq, notifyd_clean_peers_next, req);
+ return req;
+}
+
+static void notifyd_clean_peers_next(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct notifyd_clean_peers_state *state = tevent_req_data(
+ req, struct notifyd_clean_peers_state);
+ struct notifyd_state *notifyd = state->notifyd;
+ size_t i;
+ bool ok;
+ time_t now = time(NULL);
+
+ ok = tevent_wakeup_recv(subreq);
+ TALLOC_FREE(subreq);
+ if (!ok) {
+ tevent_req_oom(req);
+ return;
+ }
+
+ i = 0;
+ while (i < notifyd->num_peers) {
+ struct notifyd_peer *p = notifyd->peers[i];
+
+ if ((now - p->last_broadcast) > 60) {
+ struct server_id_buf idbuf;
+
+ /*
+ * Haven't heard for more than 60 seconds. Call this
+ * peer dead
+ */
+
+ DBG_DEBUG("peer %s died\n",
+ server_id_str_buf(p->pid, &idbuf));
+ /*
+ * This implicitly decrements notifyd->num_peers
+ */
+ TALLOC_FREE(p);
+ } else {
+ i += 1;
+ }
+ }
+
+ subreq = tevent_wakeup_send(state, state->ev,
+ timeval_current_ofs_msec(30000));
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(subreq, notifyd_clean_peers_next, req);
+}
+
+static int notifyd_clean_peers_recv(struct tevent_req *req)
+{
+ return tevent_req_simple_recv_unix(req);
+}
+
+static int notifyd_add_proxy_syswatches(struct db_record *rec,
+ void *private_data)
+{
+ struct notifyd_state *state = talloc_get_type_abort(
+ private_data, struct notifyd_state);
+ struct db_context *db = dbwrap_record_get_db(rec);
+ TDB_DATA key = dbwrap_record_get_key(rec);
+ TDB_DATA value = dbwrap_record_get_value(rec);
+ struct notifyd_instance *instances = NULL;
+ size_t num_instances = 0;
+ size_t i;
+ char path[key.dsize+1];
+ bool ok;
+
+ memcpy(path, key.dptr, key.dsize);
+ path[key.dsize] = '\0';
+
+ ok = notifyd_parse_entry(value.dptr, value.dsize, &instances,
+ &num_instances);
+ if (!ok) {
+ DBG_WARNING("Could not parse notifyd entry for %s\n", path);
+ return 0;
+ }
+
+ for (i=0; i<num_instances; i++) {
+ struct notifyd_instance *instance = &instances[i];
+ uint32_t filter = instance->instance.filter;
+ uint32_t subdir_filter = instance->instance.subdir_filter;
+ int ret;
+
+ /*
+ * This is a remote database. Pointers that we were
+ * given don't make sense locally. Initialize to NULL
+ * in case sys_notify_watch fails.
+ */
+ instances[i].sys_watch = NULL;
+
+ ret = state->sys_notify_watch(
+ db, state->sys_notify_ctx, path,
+ &filter, &subdir_filter,
+ notifyd_sys_callback, state->msg_ctx,
+ &instance->sys_watch);
+ if (ret != 0) {
+ DBG_WARNING("inotify_watch returned %s\n",
+ strerror(errno));
+ }
+ }
+
+ return 0;
+}
+
+static int notifyd_db_del_syswatches(struct db_record *rec, void *private_data)
+{
+ TDB_DATA key = dbwrap_record_get_key(rec);
+ TDB_DATA value = dbwrap_record_get_value(rec);
+ struct notifyd_instance *instances = NULL;
+ size_t num_instances = 0;
+ size_t i;
+ bool ok;
+
+ ok = notifyd_parse_entry(value.dptr, value.dsize, &instances,
+ &num_instances);
+ if (!ok) {
+ DBG_WARNING("Could not parse notifyd entry for %.*s\n",
+ (int)key.dsize, (char *)key.dptr);
+ return 0;
+ }
+ for (i=0; i<num_instances; i++) {
+ TALLOC_FREE(instances[i].sys_watch);
+ }
+ return 0;
+}
+
+static int notifyd_peer_destructor(struct notifyd_peer *p)
+{
+ struct notifyd_state *state = p->state;
+ size_t i;
+
+ if (p->db != NULL) {
+ dbwrap_traverse_read(p->db, notifyd_db_del_syswatches,
+ NULL, NULL);
+ }
+
+ for (i = 0; i<state->num_peers; i++) {
+ if (p == state->peers[i]) {
+ state->peers[i] = state->peers[state->num_peers-1];
+ state->num_peers -= 1;
+ break;
+ }
+ }
+ return 0;
+}
+
+static struct notifyd_peer *notifyd_peer_new(
+ struct notifyd_state *state, struct server_id pid)
+{
+ struct notifyd_peer *p, **tmp;
+
+ tmp = talloc_realloc(state, state->peers, struct notifyd_peer *,
+ state->num_peers+1);
+ if (tmp == NULL) {
+ return NULL;
+ }
+ state->peers = tmp;
+
+ p = talloc_zero(state->peers, struct notifyd_peer);
+ if (p == NULL) {
+ return NULL;
+ }
+ p->state = state;
+ p->pid = pid;
+
+ state->peers[state->num_peers] = p;
+ state->num_peers += 1;
+
+ talloc_set_destructor(p, notifyd_peer_destructor);
+
+ return p;
+}
+
+static void notifyd_apply_reclog(struct notifyd_peer *peer,
+ const uint8_t *msg, size_t msglen)
+{
+ struct notifyd_state *state = peer->state;
+ DATA_BLOB blob = { .data = discard_const_p(uint8_t, msg),
+ .length = msglen };
+ struct server_id_buf idbuf;
+ struct messaging_reclog *log;
+ enum ndr_err_code ndr_err;
+ uint32_t i;
+
+ if (peer->db == NULL) {
+ /*
+ * No db yet
+ */
+ return;
+ }
+
+ log = talloc(peer, struct messaging_reclog);
+ if (log == NULL) {
+ DBG_DEBUG("talloc failed\n");
+ return;
+ }
+
+ ndr_err = ndr_pull_struct_blob_all(
+ &blob, log, log,
+ (ndr_pull_flags_fn_t)ndr_pull_messaging_reclog);
+ if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
+ DBG_DEBUG("ndr_pull_messaging_reclog failed: %s\n",
+ ndr_errstr(ndr_err));
+ goto fail;
+ }
+
+ DBG_DEBUG("Got %"PRIu32" recs index %"PRIu64" from %s\n",
+ log->num_recs,
+ log->rec_index,
+ server_id_str_buf(peer->pid, &idbuf));
+
+ if (log->rec_index != peer->rec_index) {
+ DBG_INFO("Got rec index %"PRIu64" from %s, "
+ "expected %"PRIu64"\n",
+ log->rec_index,
+ server_id_str_buf(peer->pid, &idbuf),
+ peer->rec_index);
+ goto fail;
+ }
+
+ for (i=0; i<log->num_recs; i++) {
+ struct messaging_rec *r = log->recs[i];
+ struct notify_rec_change_msg *chg;
+ size_t pathlen;
+ bool ok;
+ struct notify_instance instance;
+
+ ok = notifyd_parse_rec_change(r->buf.data, r->buf.length,
+ &chg, &pathlen);
+ if (!ok) {
+ DBG_INFO("notifyd_parse_rec_change failed\n");
+ goto fail;
+ }
+
+ /* avoid SIGBUS */
+ memcpy(&instance, &chg->instance, sizeof(instance));
+
+ ok = notifyd_apply_rec_change(&r->src, chg->path, pathlen,
+ &instance, peer->db,
+ state->sys_notify_watch,
+ state->sys_notify_ctx,
+ state->msg_ctx);
+ if (!ok) {
+ DBG_INFO("notifyd_apply_rec_change failed\n");
+ goto fail;
+ }
+ }
+
+ peer->rec_index += 1;
+ peer->last_broadcast = time(NULL);
+
+ TALLOC_FREE(log);
+ return;
+
+fail:
+ DBG_DEBUG("Dropping peer %s\n",
+ server_id_str_buf(peer->pid, &idbuf));
+ TALLOC_FREE(peer);
+}
+
+/*
+ * Receive messaging_reclog (log of MSG_SMB_NOTIFY_REC_CHANGE
+ * messages) broadcasts by other notifyds. Several cases:
+ *
+ * We don't know the source. This creates a new peer. Creating a peer
+ * involves asking the peer for its full database. We assume ordered
+ * messages, so the new database will arrive before the next broadcast
+ * will.
+ *
+ * We know the source and the log index matches. We will apply the log
+ * locally to our peer's db as if we had received it from a local
+ * client.
+ *
+ * We know the source but the log index does not match. This means we
+ * lost a message. We just drop the whole peer and wait for the next
+ * broadcast, which will then trigger a fresh database pull.
+ */
+
+static int notifyd_snoop_broadcast(struct tevent_context *ev,
+ uint32_t src_vnn, uint32_t dst_vnn,
+ uint64_t dst_srvid,
+ const uint8_t *msg, size_t msglen,
+ void *private_data)
+{
+ struct notifyd_state *state = talloc_get_type_abort(
+ private_data, struct notifyd_state);
+ struct server_id my_id = messaging_server_id(state->msg_ctx);
+ struct notifyd_peer *p;
+ uint32_t i;
+ uint32_t msg_type;
+ struct server_id src, dst;
+ struct server_id_buf idbuf;
+ NTSTATUS status;
+
+ if (msglen < MESSAGE_HDR_LENGTH) {
+ DBG_DEBUG("Got short broadcast\n");
+ return 0;
+ }
+ message_hdr_get(&msg_type, &src, &dst, msg);
+
+ if (msg_type != MSG_SMB_NOTIFY_REC_CHANGES) {
+ DBG_DEBUG("Got message %"PRIu32", ignoring\n", msg_type);
+ return 0;
+ }
+ if (server_id_equal(&src, &my_id)) {
+ DBG_DEBUG("Ignoring my own broadcast\n");
+ return 0;
+ }
+
+ DBG_DEBUG("Got MSG_SMB_NOTIFY_REC_CHANGES from %s\n",
+ server_id_str_buf(src, &idbuf));
+
+ for (i=0; i<state->num_peers; i++) {
+ if (server_id_equal(&state->peers[i]->pid, &src)) {
+
+ DBG_DEBUG("Applying changes to peer %"PRIu32"\n", i);
+
+ notifyd_apply_reclog(state->peers[i],
+ msg + MESSAGE_HDR_LENGTH,
+ msglen - MESSAGE_HDR_LENGTH);
+ return 0;
+ }
+ }
+
+ DBG_DEBUG("Creating new peer for %s\n",
+ server_id_str_buf(src, &idbuf));
+
+ p = notifyd_peer_new(state, src);
+ if (p == NULL) {
+ DBG_DEBUG("notifyd_peer_new failed\n");
+ return 0;
+ }
+
+ status = messaging_send_buf(state->msg_ctx, src, MSG_SMB_NOTIFY_GET_DB,
+ NULL, 0);
+ if (!NT_STATUS_IS_OK(status)) {
+ DBG_DEBUG("messaging_send_buf failed: %s\n",
+ nt_errstr(status));
+ TALLOC_FREE(p);
+ return 0;
+ }
+
+ return 0;
+}
+#endif
diff --git a/source3/smbd/notifyd/notifyd.h b/source3/smbd/notifyd/notifyd.h
new file mode 100644
index 0000000..5ef8c4c
--- /dev/null
+++ b/source3/smbd/notifyd/notifyd.h
@@ -0,0 +1,145 @@
+/*
+ * Unix SMB/CIFS implementation.
+ *
+ * Copyright (C) Volker Lendecke 2014
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef __NOTIFYD_NOTIFYD_H__
+#define __NOTIFYD_NOTIFYD_H__
+
+#include "includes.h"
+#include "librpc/gen_ndr/notify.h"
+#include "librpc/gen_ndr/messaging.h"
+#include "lib/dbwrap/dbwrap.h"
+#include "lib/dbwrap/dbwrap_rbt.h"
+#include "messages.h"
+#include "tdb.h"
+#include "util_tdb.h"
+
+/*
+ * Filechangenotify based on asynchronous messages
+ *
+ * smbds talk to local notify daemons to inform them about paths they are
+ * interested in. They also tell local notify daemons about changes they have
+ * done to the file system. There's two message types from smbd to
+ * notifyd. The first is used to inform notifyd about changes in notify
+ * interest. These are only sent from smbd to notifyd if the SMB client issues
+ * FileChangeNotify requests.
+ */
+
+/*
+ * The notifyd implementation is designed to cope with multiple daemons taking
+ * care of just a subset of smbds. The goal is to minimize the traffic between
+ * the notify daemons. The idea behind this is a samba/ctdb cluster, but it
+ * could also be used to spread the load of notifyd instances on a single
+ * node, should this become a bottleneck. The following diagram illustrates
+ * the setup. The numbers in the boxes are node:process ids.
+ *
+ * +-----------+ +-----------+
+ * |notifyd 0:5|------------------|notifyd 1:6|
+ * +-----------+ +-----------+
+ * / | \ / \
+ * / | \ / \
+ * +--------+ | +--------+ +--------+ +--------+
+ * |smbd 0:1| | |smbd 0:4| |smbd 1:7| |smbd 1:2|
+ * +--------+ | +--------+ +--------+ +--------+
+ * |
+ * +---------+
+ * |smbd 0:20|
+ * +---------+
+ *
+ * Suppose 0:1 and 0:4 are interested in changes for /foo and 0:20 creates the
+ * file /foo/bar, if everything fully connected, 0:20 would have to send two
+ * local messages, one to 0:1 and one to 0:4. With the notifyd design, 0:20
+ * only has to send one message, it lets notifyd 0:5 do the hard work to
+ * multicast the change to 0:1 and 0:4.
+ *
+ * Now lets assume 1:7 on the other node creates /foo/baz. It tells its
+ * notifyd 1:6 about this change. All 1:6 will know about is that its peer
+ * notifyd 0:5 is interested in the change. Thus it forwards the event to 0:5,
+ * which sees it as if it came from just another local event creator. 0:5 will
+ * multicast the change to 0:1 and 0:4. To prevent notify loops, the message
+ * from 1:6 to 0:5 will carry a "proxied" flag, so that 0:5 will only forward
+ * the event to local clients.
+ */
+
+/*
+ * Data that notifyd maintains per smbd notify instance
+ */
+struct notify_instance {
+ struct timespec creation_time;
+ uint32_t filter;
+ uint32_t subdir_filter;
+ void *private_data;
+};
+
+/* MSG_SMB_NOTIFY_REC_CHANGE payload */
+struct notify_rec_change_msg {
+ struct notify_instance instance;
+ char path[];
+};
+
+/*
+ * The second message from smbd to notifyd is sent whenever an smbd makes a
+ * file system change. It tells notifyd to inform all interested parties about
+ * that change. This is the message that needs to be really fast in smbd
+ * because it is called a lot.
+ */
+
+/* MSG_SMB_NOTIFY_TRIGGER payload */
+struct notify_trigger_msg {
+ struct timespec when;
+ uint32_t action;
+ uint32_t filter;
+ char path[];
+};
+
+/*
+ * In response to a MSG_SMB_NOTIFY_TRIGGER message notifyd walks its database
+ * and sends out the following message to all interested clients
+ */
+
+/* MSG_PVFS_NOTIFY payload */
+struct notify_event_msg {
+ struct timespec when;
+ void *private_data;
+ uint32_t action;
+ char path[];
+};
+
+struct sys_notify_context;
+struct ctdbd_connection;
+
+typedef int (*sys_notify_watch_fn)(TALLOC_CTX *mem_ctx,
+ struct sys_notify_context *ctx,
+ const char *path,
+ uint32_t *filter,
+ uint32_t *subdir_filter,
+ void (*callback)(struct sys_notify_context *ctx,
+ void *private_data,
+ struct notify_event *ev,
+ uint32_t filter),
+ void *private_data,
+ void *handle_p);
+
+struct tevent_req *notifyd_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
+ struct messaging_context *msg_ctx,
+ struct ctdbd_connection *ctdbd_conn,
+ sys_notify_watch_fn sys_notify_watch,
+ struct sys_notify_context *sys_notify_ctx);
+int notifyd_recv(struct tevent_req *req);
+
+#endif
diff --git a/source3/smbd/notifyd/notifyd_db.c b/source3/smbd/notifyd/notifyd_db.c
new file mode 100644
index 0000000..1822861
--- /dev/null
+++ b/source3/smbd/notifyd/notifyd_db.c
@@ -0,0 +1,165 @@
+/*
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "replace.h"
+#include "lib/util/debug.h"
+#include "lib/util/server_id_db.h"
+#include "notifyd_private.h"
+#include "notifyd_db.h"
+
+struct notifyd_parse_db_state {
+ bool (*fn)(const char *path,
+ struct server_id server,
+ const struct notify_instance *instance,
+ void *private_data);
+ void *private_data;
+};
+
+static bool notifyd_parse_db_parser(TDB_DATA key, TDB_DATA value,
+ void *private_data)
+{
+ struct notifyd_parse_db_state *state = private_data;
+ char path[key.dsize+1];
+ struct notifyd_instance *instances = NULL;
+ size_t num_instances = 0;
+ size_t i;
+ bool ok;
+
+ memcpy(path, key.dptr, key.dsize);
+ path[key.dsize] = 0;
+
+ ok = notifyd_parse_entry(value.dptr, value.dsize, &instances,
+ &num_instances);
+ if (!ok) {
+ DBG_DEBUG("Could not parse entry for path %s\n", path);
+ return true;
+ }
+
+ for (i=0; i<num_instances; i++) {
+ ok = state->fn(path, instances[i].client,
+ &instances[i].instance,
+ state->private_data);
+ if (!ok) {
+ return false;
+ }
+ }
+
+ return true;
+}
+
+static NTSTATUS notifyd_parse_db(
+ const uint8_t *buf,
+ size_t buflen,
+ uint64_t *log_index,
+ bool (*fn)(const char *path,
+ struct server_id server,
+ const struct notify_instance *instance,
+ void *private_data),
+ void *private_data)
+{
+ struct notifyd_parse_db_state state = {
+ .fn = fn, .private_data = private_data
+ };
+ NTSTATUS status;
+
+ if (buflen < 8) {
+ return NT_STATUS_INTERNAL_DB_CORRUPTION;
+ }
+ *log_index = BVAL(buf, 0);
+
+ buf += 8;
+ buflen -= 8;
+
+ status = dbwrap_parse_marshall_buf(
+ buf, buflen, notifyd_parse_db_parser, &state);
+ return status;
+}
+
+NTSTATUS notify_walk(struct messaging_context *msg_ctx,
+ bool (*fn)(const char *path, struct server_id server,
+ const struct notify_instance *instance,
+ void *private_data),
+ void *private_data)
+{
+ struct server_id_db *names_db = NULL;
+ struct server_id notifyd = { .pid = 0, };
+ struct tevent_context *ev = NULL;
+ struct tevent_req *req = NULL;
+ struct messaging_rec *rec = NULL;
+ uint64_t log_idx;
+ NTSTATUS status;
+ int ret;
+ bool ok;
+
+ names_db = messaging_names_db(msg_ctx);
+ ok = server_id_db_lookup_one(names_db, "notify-daemon", &notifyd);
+ if (!ok) {
+ DBG_WARNING("No notify daemon around\n");
+ return NT_STATUS_SERVER_UNAVAILABLE;
+ }
+
+ ev = samba_tevent_context_init(msg_ctx);
+ if (ev == NULL) {
+ return NT_STATUS_NO_MEMORY;
+ }
+
+ req = messaging_read_send(ev, ev, msg_ctx, MSG_SMB_NOTIFY_DB);
+ if (req == NULL) {
+ TALLOC_FREE(ev);
+ return NT_STATUS_NO_MEMORY;
+ }
+
+ ok = tevent_req_set_endtime(req, ev, timeval_current_ofs(10, 0));
+ if (!ok) {
+ TALLOC_FREE(ev);
+ return NT_STATUS_NO_MEMORY;
+ }
+
+ status = messaging_send_buf(
+ msg_ctx, notifyd, MSG_SMB_NOTIFY_GET_DB, NULL, 0);
+ if (!NT_STATUS_IS_OK(status)) {
+ DBG_DEBUG("messaging_send_buf failed: %s\n",
+ nt_errstr(status));
+ TALLOC_FREE(ev);
+ return status;
+ }
+
+ ok = tevent_req_poll(req, ev);
+ if (!ok) {
+ DBG_DEBUG("tevent_req_poll failed\n");
+ TALLOC_FREE(ev);
+ return NT_STATUS_INTERNAL_ERROR;
+ }
+
+ ret = messaging_read_recv(req, ev, &rec);
+ if (ret != 0) {
+ DBG_DEBUG("messaging_read_recv failed: %s\n",
+ strerror(ret));
+ TALLOC_FREE(ev);
+ return map_nt_error_from_unix(ret);
+ }
+
+ status = notifyd_parse_db(
+ rec->buf.data, rec->buf.length, &log_idx, fn, private_data);
+ if (!NT_STATUS_IS_OK(status)) {
+ DBG_DEBUG("notifyd_parse_db failed: %s\n",
+ nt_errstr(status));
+ TALLOC_FREE(ev);
+ return status;
+ }
+
+ TALLOC_FREE(ev);
+ return NT_STATUS_OK;
+}
diff --git a/source3/smbd/notifyd/notifyd_db.h b/source3/smbd/notifyd/notifyd_db.h
new file mode 100644
index 0000000..bd9e226
--- /dev/null
+++ b/source3/smbd/notifyd/notifyd_db.h
@@ -0,0 +1,27 @@
+/*
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef __NOTIFYD_NOTIFYD_DB_H__
+#define __NOTIFYD_NOTIFYD_DB_H__
+#include "replace.h"
+#include "notifyd.h"
+
+NTSTATUS notify_walk(struct messaging_context *msg_ctx,
+ bool (*fn)(const char *path, struct server_id server,
+ const struct notify_instance *instance,
+ void *private_data),
+ void *private_data);
+
+#endif
diff --git a/source3/smbd/notifyd/notifyd_entry.c b/source3/smbd/notifyd/notifyd_entry.c
new file mode 100644
index 0000000..539010d
--- /dev/null
+++ b/source3/smbd/notifyd/notifyd_entry.c
@@ -0,0 +1,42 @@
+/*
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "replace.h"
+#include "lib/util/debug.h"
+#include "notifyd_private.h"
+
+/*
+ * Parse an entry in the notifyd_context->entries database
+ */
+
+bool notifyd_parse_entry(
+ uint8_t *buf,
+ size_t buflen,
+ struct notifyd_instance **instances,
+ size_t *num_instances)
+{
+ if ((buflen % sizeof(struct notifyd_instance)) != 0) {
+ DBG_WARNING("invalid buffer size: %zu\n", buflen);
+ return false;
+ }
+
+ if (instances != NULL) {
+ *instances = (struct notifyd_instance *)buf;
+ }
+ if (num_instances != NULL) {
+ *num_instances = buflen / sizeof(struct notifyd_instance);
+ }
+ return true;
+}
diff --git a/source3/smbd/notifyd/notifyd_private.h b/source3/smbd/notifyd/notifyd_private.h
new file mode 100644
index 0000000..36c08f4
--- /dev/null
+++ b/source3/smbd/notifyd/notifyd_private.h
@@ -0,0 +1,49 @@
+/*
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef __NOTIFYD_PRIVATE_H__
+#define __NOTIFYD_PRIVATE_H__
+
+#include "replace.h"
+#include "lib/util/server_id.h"
+#include "notifyd.h"
+
+/*
+ * notifyd's representation of a notify instance
+ */
+struct notifyd_instance {
+ struct server_id client;
+ struct notify_instance instance;
+
+ void *sys_watch; /* inotify/fam/etc handle */
+
+ /*
+ * Filters after sys_watch took responsibility of some bits
+ */
+ uint32_t internal_filter;
+ uint32_t internal_subdir_filter;
+};
+
+/*
+ * Parse an entry in the notifyd_context->entries database
+ */
+
+bool notifyd_parse_entry(
+ uint8_t *buf,
+ size_t buflen,
+ struct notifyd_instance **instances,
+ size_t *num_instances);
+
+#endif
diff --git a/source3/smbd/notifyd/notifydd.c b/source3/smbd/notifyd/notifydd.c
new file mode 100644
index 0000000..26bfcd8
--- /dev/null
+++ b/source3/smbd/notifyd/notifydd.c
@@ -0,0 +1,89 @@
+/*
+ * Unix SMB/CIFS implementation.
+ *
+ * Copyright (C) Volker Lendecke 2014
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "replace.h"
+#include "notifyd.h"
+#include "lib/messages_ctdb.h"
+#include <tevent.h>
+#include "lib/util/tevent_unix.h"
+
+int main(int argc, const char *argv[])
+{
+ TALLOC_CTX *frame;
+ struct tevent_context *ev;
+ struct messaging_context *msg;
+ struct tevent_req *req;
+ int err, ret;
+ bool ok;
+
+ talloc_enable_leak_report_full();
+
+ frame = talloc_stackframe();
+
+ setup_logging("notifyd", DEBUG_DEFAULT_STDOUT);
+ lp_set_cmdline("log level", "10");
+
+ ok = lp_load_initial_only(get_dyn_CONFIGFILE());
+ if (!ok) {
+ fprintf(stderr, "Can't load %s - run testparm to debug it\n",
+ get_dyn_CONFIGFILE());
+ return 1;
+ }
+
+ ev = samba_tevent_context_init(frame);
+ if (ev == NULL) {
+ fprintf(stderr, "samba_tevent_context_init failed\n");
+ return 1;
+ }
+
+ msg = messaging_init(ev, ev);
+ if (msg == NULL) {
+ fprintf(stderr, "messaging_init failed\n");
+ return 1;
+ }
+
+ if (!lp_load_global(get_dyn_CONFIGFILE())) {
+ fprintf(stderr, "Can't load %s - run testparm to debug it\n",
+ get_dyn_CONFIGFILE());
+ return 1;
+ }
+
+ req = notifyd_send(ev, ev, msg, messaging_ctdb_connection(),
+ NULL, NULL);
+ if (req == NULL) {
+ fprintf(stderr, "notifyd_send failed\n");
+ return 1;
+ }
+
+ ok = tevent_req_poll_unix(req, ev, &err);
+ if (!ok) {
+ fprintf(stderr, "tevent_req_poll_unix failed: %s\n",
+ strerror(err));
+ return 1;
+ }
+
+ ret = notifyd_recv(req);
+
+ printf("notifyd_recv returned %d (%s)\n", ret,
+ ret ? strerror(ret) : "ok");
+
+ TALLOC_FREE(frame);
+
+ return 0;
+}
diff --git a/source3/smbd/notifyd/test_notifyd.c b/source3/smbd/notifyd/test_notifyd.c
new file mode 100644
index 0000000..431da9b
--- /dev/null
+++ b/source3/smbd/notifyd/test_notifyd.c
@@ -0,0 +1,347 @@
+/*
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "replace.h"
+#include "fcn_wait.h"
+#include "notifyd.h"
+#include "notifyd_db.h"
+#include "messages.h"
+#include "lib/util/server_id.h"
+#include "lib/util/server_id_db.h"
+#include "lib/util/tevent_ntstatus.h"
+#include "lib/torture/torture.h"
+#include "torture/local/proto.h"
+#include "lib/param/loadparm.h"
+#include "source3/param/loadparm.h"
+#include "source4/torture/smbtorture.h"
+
+struct fcn_test_state {
+ struct tevent_req *fcn_req;
+ bool got_trigger;
+};
+
+static void fcn_test_done(struct tevent_req *subreq);
+
+static struct tevent_req *fcn_test_send(
+ TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ struct messaging_context *msg_ctx,
+ struct server_id notifyd,
+ const char *fcn_path,
+ uint32_t fcn_filter,
+ uint32_t fcn_subdir_filter,
+ const char *trigger_path,
+ uint32_t trigger_action,
+ uint32_t trigger_filter)
+{
+ struct tevent_req *req = NULL;
+ struct fcn_test_state *state = NULL;
+ struct notify_trigger_msg msg;
+ struct iovec iov[2];
+ NTSTATUS status;
+
+ req = tevent_req_create(mem_ctx, &state, struct fcn_test_state);
+ if (req == NULL) {
+ return NULL;
+ }
+
+ state->fcn_req = fcn_wait_send(
+ state,
+ ev,
+ msg_ctx,
+ notifyd,
+ fcn_path,
+ fcn_filter,
+ fcn_subdir_filter);
+ if (tevent_req_nomem(state->fcn_req, req)) {
+ return tevent_req_post(req, ev);
+ }
+ tevent_req_set_callback(state->fcn_req, fcn_test_done, req);
+
+ msg = (struct notify_trigger_msg) {
+ .when = timespec_current(),
+ .action = trigger_action,
+ .filter = trigger_filter,
+ };
+ iov[0] = (struct iovec) {
+ .iov_base = &msg,
+ .iov_len = offsetof(struct notify_trigger_msg, path),
+ };
+ iov[1] = (struct iovec) {
+ .iov_base = discard_const_p(char, trigger_path),
+ .iov_len = strlen(trigger_path)+1,
+ };
+
+ status = messaging_send_iov(
+ msg_ctx,
+ notifyd,
+ MSG_SMB_NOTIFY_TRIGGER,
+ iov,
+ ARRAY_SIZE(iov),
+ NULL,
+ 0);
+ if (tevent_req_nterror(req, status)) {
+ return tevent_req_post(req, ev);
+ }
+
+ return req;
+}
+
+static void fcn_test_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct fcn_test_state *state = tevent_req_data(
+ req, struct fcn_test_state);
+ NTSTATUS status;
+ bool ok;
+
+ SMB_ASSERT(subreq == state->fcn_req);
+
+ status = fcn_wait_recv(subreq, NULL, NULL, NULL, NULL);
+
+ if (NT_STATUS_EQUAL(status, NT_STATUS_CANCELLED)) {
+ TALLOC_FREE(subreq);
+ state->fcn_req = NULL;
+ tevent_req_done(req);
+ return;
+ }
+
+ if (tevent_req_nterror(req, status)) {
+ TALLOC_FREE(subreq);
+ state->fcn_req = NULL;
+ return;
+ }
+
+ state->got_trigger = true;
+
+ ok = tevent_req_cancel(subreq);
+ if (!ok) {
+ tevent_req_nterror(req, NT_STATUS_INTERNAL_ERROR);
+ return;
+ }
+}
+
+static NTSTATUS fcn_test_recv(struct tevent_req *req, bool *got_trigger)
+{
+ struct fcn_test_state *state = tevent_req_data(
+ req, struct fcn_test_state);
+ NTSTATUS status;
+
+ if (tevent_req_is_nterror(req, &status)) {
+ return status;
+ }
+ if (got_trigger != NULL) {
+ *got_trigger = state->got_trigger;
+ }
+
+ return NT_STATUS_OK;
+}
+
+static NTSTATUS fcn_test(
+ struct messaging_context *msg_ctx,
+ struct server_id notifyd,
+ const char *fcn_path,
+ uint32_t fcn_filter,
+ uint32_t fcn_subdir_filter,
+ const char *trigger_path,
+ uint32_t trigger_action,
+ uint32_t trigger_filter,
+ bool *got_trigger)
+{
+ struct tevent_context *ev = NULL;
+ struct tevent_req *req = NULL;
+ NTSTATUS status = NT_STATUS_NO_MEMORY;
+
+ ev = samba_tevent_context_init(msg_ctx);
+ if (ev == NULL) {
+ goto fail;
+ }
+ req = fcn_test_send(
+ ev,
+ ev,
+ msg_ctx,
+ notifyd,
+ fcn_path,
+ fcn_filter,
+ fcn_subdir_filter,
+ trigger_path,
+ trigger_action,
+ trigger_filter);
+ if (req == NULL) {
+ goto fail;
+ }
+ if (!tevent_req_poll_ntstatus(req, ev, &status)) {
+ goto fail;
+ }
+ status = fcn_test_recv(req, got_trigger);
+fail:
+ TALLOC_FREE(ev);
+ return status;
+}
+
+static bool test_notifyd_trigger1(struct torture_context *tctx)
+{
+ struct messaging_context *msg_ctx = NULL;
+ struct server_id_db *names = NULL;
+ struct server_id notifyd;
+ NTSTATUS status;
+ bool got_trigger = false;
+ bool ok;
+
+ /*
+ * Basic filechangenotify test: Wait for /home, trigger on
+ * /home/foo, check an event was received
+ */
+
+ lp_load_global(tctx->lp_ctx->szConfigFile);
+
+ msg_ctx = messaging_init(tctx, tctx->ev);
+ torture_assert_not_null(tctx, msg_ctx, "messaging_init");
+
+ names = messaging_names_db(msg_ctx);
+ ok = server_id_db_lookup_one(names, "notify-daemon", &notifyd);
+ torture_assert(tctx, ok, "server_id_db_lookup_one");
+
+ status = fcn_test(
+ msg_ctx,
+ notifyd,
+ "/home",
+ UINT32_MAX,
+ UINT32_MAX,
+ "/home/foo",
+ UINT32_MAX,
+ UINT32_MAX,
+ &got_trigger);
+ torture_assert_ntstatus_ok(tctx, status, "fcn_test");
+ torture_assert(tctx, got_trigger, "got_trigger");
+
+ return true;
+}
+
+struct notifyd_have_state {
+ struct server_id self;
+ bool found;
+};
+
+static bool notifyd_have_fn(
+ const char *path,
+ struct server_id server,
+ const struct notify_instance *instance,
+ void *private_data)
+{
+ struct notifyd_have_state *state = private_data;
+ state->found |= server_id_equal(&server, &state->self);
+ return true;
+}
+
+static bool notifyd_have_self(struct messaging_context *msg_ctx)
+{
+ struct notifyd_have_state state = {
+ .self = messaging_server_id(msg_ctx),
+ };
+ NTSTATUS status;
+
+ status = notify_walk(msg_ctx, notifyd_have_fn, &state);
+ if (!NT_STATUS_IS_OK(status)) {
+ return false;
+ }
+ return state.found;
+}
+
+static bool test_notifyd_dbtest1(struct torture_context *tctx)
+{
+ struct tevent_context *ev = tctx->ev;
+ struct messaging_context *msg_ctx = NULL;
+ struct tevent_req *req = NULL;
+ struct server_id_db *names = NULL;
+ struct server_id notifyd;
+ NTSTATUS status;
+ bool ok;
+
+ /*
+ * Make sure fcn_wait_send adds us to the notifyd internal
+ * database and that cancelling the fcn request removes us
+ * again.
+ */
+
+ lp_load_global(tctx->lp_ctx->szConfigFile);
+
+ msg_ctx = messaging_init(tctx, ev);
+ torture_assert_not_null(tctx, msg_ctx, "messaging_init");
+
+ names = messaging_names_db(msg_ctx);
+ ok = server_id_db_lookup_one(names, "notify-daemon", &notifyd);
+ torture_assert(tctx, ok, "server_id_db_lookup_one");
+
+ req = fcn_wait_send(
+ msg_ctx, ev, msg_ctx, notifyd, "/x", UINT32_MAX, UINT32_MAX);
+ torture_assert_not_null(tctx, req, "fcn_wait_send");
+
+ ok = notifyd_have_self(msg_ctx);
+ torture_assert(tctx, ok, "notifyd_have_self");
+
+ ok = tevent_req_cancel(req);
+ torture_assert(tctx, ok, "tevent_req_cancel");
+
+ ok = tevent_req_poll(req, ev);
+ torture_assert(tctx, ok, "tevent_req_poll");
+
+ status = fcn_wait_recv(req, NULL, NULL, NULL, NULL);
+ torture_assert_ntstatus_equal(
+ tctx, status, NT_STATUS_CANCELLED, "fcn_wait_recv");
+ TALLOC_FREE(req);
+
+ ok = notifyd_have_self(msg_ctx);
+ torture_assert(tctx, !ok, "tevent_req_poll");
+ TALLOC_FREE(msg_ctx);
+
+ return true;
+}
+
+NTSTATUS torture_notifyd_init(TALLOC_CTX *mem_ctx);
+NTSTATUS torture_notifyd_init(TALLOC_CTX *mem_ctx)
+{
+ struct torture_suite *suite = NULL;
+ struct torture_tcase *tcase = NULL;
+ bool ok;
+
+ suite = torture_suite_create(mem_ctx, "notifyd");
+ if (suite == NULL) {
+ goto fail;
+ }
+
+ tcase = torture_suite_add_simple_test(
+ suite, "trigger1", test_notifyd_trigger1);
+ if (tcase == NULL) {
+ goto fail;
+ }
+
+ tcase = torture_suite_add_simple_test(
+ suite, "dbtest1", test_notifyd_dbtest1);
+ if (tcase == NULL) {
+ goto fail;
+ }
+ suite->description = "notifyd unit tests";
+
+ ok = torture_register_suite(mem_ctx, suite);
+ if (!ok) {
+ goto fail;
+ }
+ return NT_STATUS_OK;
+fail:
+ TALLOC_FREE(suite);
+ return NT_STATUS_NO_MEMORY;
+}
diff --git a/source3/smbd/notifyd/tests.c b/source3/smbd/notifyd/tests.c
new file mode 100644
index 0000000..6bcce6a
--- /dev/null
+++ b/source3/smbd/notifyd/tests.c
@@ -0,0 +1,118 @@
+/*
+ * Unix SMB/CIFS implementation.
+ *
+ * Copyright (C) Volker Lendecke 2014
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "replace.h"
+#include "notifyd.h"
+#include "messages.h"
+#include "lib/util/server_id_db.h"
+
+int main(int argc, const char *argv[])
+{
+ TALLOC_CTX *frame = talloc_stackframe();
+ struct tevent_context *ev;
+ struct messaging_context *msg_ctx;
+ struct server_id_db *names;
+ struct server_id notifyd;
+ struct tevent_req *req;
+ unsigned i;
+ bool ok;
+
+ if (argc != 2) {
+ fprintf(stderr, "Usage: %s <smb.conf-file>\n", argv[0]);
+ exit(1);
+ }
+
+ setup_logging(argv[0], DEBUG_STDOUT);
+ lp_load_global(argv[1]);
+
+ ev = tevent_context_init(NULL);
+ if (ev == NULL) {
+ fprintf(stderr, "tevent_context_init failed\n");
+ exit(1);
+ }
+
+ msg_ctx = messaging_init(ev, ev);
+ if (msg_ctx == NULL) {
+ fprintf(stderr, "messaging_init failed\n");
+ exit(1);
+ }
+
+ names = messaging_names_db(msg_ctx);
+
+ ok = server_id_db_lookup_one(names, "notify-daemon", &notifyd);
+ if (!ok) {
+ fprintf(stderr, "no notifyd\n");
+ exit(1);
+ }
+
+ for (i=0; i<50000; i++) {
+ struct notify_rec_change_msg msg = {
+ .instance.filter = UINT32_MAX,
+ .instance.subdir_filter = UINT32_MAX
+ };
+ char path[64];
+ size_t len;
+ struct iovec iov[2];
+ NTSTATUS status;
+
+ len = snprintf(path, sizeof(path), "/tmp%u", i);
+
+ iov[0].iov_base = &msg;
+ iov[0].iov_len = offsetof(struct notify_rec_change_msg, path);
+ iov[1].iov_base = path;
+ iov[1].iov_len = len+1;
+
+ status = messaging_send_iov(
+ msg_ctx, notifyd, MSG_SMB_NOTIFY_REC_CHANGE,
+ iov, ARRAY_SIZE(iov), NULL, 0);
+ if (!NT_STATUS_IS_OK(status)) {
+ fprintf(stderr, "messaging_send_iov returned %s\n",
+ nt_errstr(status));
+ exit(1);
+ }
+
+ msg.instance.filter = 0;
+ msg.instance.subdir_filter = 0;
+
+ status = messaging_send_iov(
+ msg_ctx, notifyd, MSG_SMB_NOTIFY_REC_CHANGE,
+ iov, ARRAY_SIZE(iov), NULL, 0);
+ if (!NT_STATUS_IS_OK(status)) {
+ fprintf(stderr, "messaging_send_iov returned %s\n",
+ nt_errstr(status));
+ exit(1);
+ }
+ }
+
+ req = messaging_read_send(ev, ev, msg_ctx, MSG_PONG);
+ if (req == NULL) {
+ fprintf(stderr, "messaging_read_send failed\n");
+ exit(1);
+ }
+ messaging_send_buf(msg_ctx, notifyd, MSG_PING, NULL, 0);
+
+ ok = tevent_req_poll(req, ev);
+ if (!ok) {
+ fprintf(stderr, "tevent_req_poll failed\n");
+ exit(1);
+ }
+
+ TALLOC_FREE(frame);
+ return 0;
+}
diff --git a/source3/smbd/notifyd/wscript_build b/source3/smbd/notifyd/wscript_build
new file mode 100644
index 0000000..6880a31
--- /dev/null
+++ b/source3/smbd/notifyd/wscript_build
@@ -0,0 +1,44 @@
+#!/usr/bin/env python
+
+bld.SAMBA3_SUBSYSTEM('fcn_wait',
+ source='fcn_wait.c',
+ deps='samba3core')
+
+bld.SAMBA3_SUBSYSTEM('notifyd_db',
+ source='notifyd_entry.c notifyd_db.c',
+ deps='samba-debug dbwrap errors3')
+
+bld.SAMBA3_SUBSYSTEM('notifyd',
+ source='notifyd.c',
+ deps='''
+ util_tdb
+ TDB_LIB
+ messages_util
+ notifyd_db
+ ''')
+
+bld.SAMBA3_BINARY('notifyd-tests',
+ source='tests.c',
+ install=False,
+ deps='''
+ smbconf
+ ''')
+
+bld.SAMBA3_BINARY('notifydd',
+ source='notifydd.c',
+ install=False,
+ deps='''notifyd
+ smbconf
+ ''')
+
+TORTURE_NOTIFYD_SOURCE='test_notifyd.c'
+TORTURE_NOTIFYD_DEPS='fcn_wait notifyd_db'
+
+bld.SAMBA_MODULE('TORTURE_NOTIFYD',
+ source=TORTURE_NOTIFYD_SOURCE,
+ subsystem='smbtorture',
+ init_function='torture_notifyd_init',
+ deps=TORTURE_NOTIFYD_DEPS,
+ internal_module=True,
+ enabled=bld.PYTHON_BUILD_IS_ENABLED()
+ )