diff options
Diffstat (limited to 'source3/smbd/notifyd')
-rw-r--r-- | source3/smbd/notifyd/fcn_wait.c | 270 | ||||
-rw-r--r-- | source3/smbd/notifyd/fcn_wait.h | 38 | ||||
-rw-r--r-- | source3/smbd/notifyd/notifyd.c | 1428 | ||||
-rw-r--r-- | source3/smbd/notifyd/notifyd.h | 145 | ||||
-rw-r--r-- | source3/smbd/notifyd/notifyd_db.c | 165 | ||||
-rw-r--r-- | source3/smbd/notifyd/notifyd_db.h | 27 | ||||
-rw-r--r-- | source3/smbd/notifyd/notifyd_entry.c | 42 | ||||
-rw-r--r-- | source3/smbd/notifyd/notifyd_private.h | 49 | ||||
-rw-r--r-- | source3/smbd/notifyd/notifydd.c | 89 | ||||
-rw-r--r-- | source3/smbd/notifyd/test_notifyd.c | 347 | ||||
-rw-r--r-- | source3/smbd/notifyd/tests.c | 118 | ||||
-rw-r--r-- | source3/smbd/notifyd/wscript_build | 44 |
12 files changed, 2762 insertions, 0 deletions
diff --git a/source3/smbd/notifyd/fcn_wait.c b/source3/smbd/notifyd/fcn_wait.c new file mode 100644 index 0000000..e32240d --- /dev/null +++ b/source3/smbd/notifyd/fcn_wait.c @@ -0,0 +1,270 @@ +/* + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "replace.h" +#include "fcn_wait.h" +#include "notifyd.h" +#include "lib/util/tevent_ntstatus.h" + +struct fcn_event { + struct fcn_event *prev, *next; + struct notify_event_msg msg; +}; + +struct fcn_wait_state { + struct tevent_context *ev; + struct messaging_context *msg_ctx; + struct server_id notifyd; + const char *path; + + struct tevent_req *recv_subreq; + + struct fcn_event *events; +}; + +static bool fcn_wait_cancel(struct tevent_req *req); +static void fcn_wait_cleanup( + struct tevent_req *req, enum tevent_req_state req_state); +static bool fcn_wait_filter(struct messaging_rec *rec, void *private_data); +static void fcn_wait_done(struct tevent_req *subreq); + +struct tevent_req *fcn_wait_send( + TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct messaging_context *msg_ctx, + struct server_id notifyd, + const char *path, + uint32_t filter, + uint32_t subdir_filter) +{ + struct tevent_req *req = NULL; + struct fcn_wait_state *state = NULL; + struct notify_rec_change_msg msg = { + .instance.filter = filter, + .instance.subdir_filter = subdir_filter, + }; + struct iovec iov[2]; + NTSTATUS status; + + req = tevent_req_create(mem_ctx, &state, struct fcn_wait_state); + if (req == NULL) { + return NULL; + } + state->ev = ev; + state->msg_ctx = msg_ctx; + state->notifyd = notifyd; + state->path = path; + + state->recv_subreq = messaging_filtered_read_send( + state, ev, msg_ctx, fcn_wait_filter, req); + if (tevent_req_nomem(state->recv_subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(state->recv_subreq, fcn_wait_done, req); + tevent_req_set_cleanup_fn(req, fcn_wait_cleanup); + + clock_gettime_mono(&msg.instance.creation_time); + msg.instance.private_data = state; + + iov[0].iov_base = &msg; + iov[0].iov_len = offsetof(struct notify_rec_change_msg, path); + iov[1].iov_base = discard_const_p(char, path); + iov[1].iov_len = strlen(path)+1; + + status = messaging_send_iov( + msg_ctx, /* msg_ctx */ + notifyd, /* dst */ + MSG_SMB_NOTIFY_REC_CHANGE, /* mst_type */ + iov, /* iov */ + ARRAY_SIZE(iov), /* iovlen */ + NULL, /* fds */ + 0); /* num_fds */ + if (tevent_req_nterror(req, status)) { + DBG_DEBUG("messaging_send_iov failed: %s\n", + nt_errstr(status)); + return tevent_req_post(req, ev); + } + tevent_req_set_cancel_fn(req, fcn_wait_cancel); + + return req; +} + +static bool fcn_wait_cancel(struct tevent_req *req) +{ + struct fcn_wait_state *state = tevent_req_data( + req, struct fcn_wait_state); + struct notify_rec_change_msg msg = { + .instance.filter = 0, /* filter==0 is a delete msg */ + .instance.subdir_filter = 0, + }; + struct iovec iov[2]; + NTSTATUS status; + + clock_gettime_mono(&msg.instance.creation_time); + msg.instance.private_data = state; + + iov[0].iov_base = &msg; + iov[0].iov_len = offsetof(struct notify_rec_change_msg, path); + iov[1].iov_base = discard_const_p(char, state->path); + iov[1].iov_len = strlen(state->path)+1; + + status = messaging_send_iov( + state->msg_ctx, /* msg_ctx */ + state->notifyd, /* dst */ + MSG_SMB_NOTIFY_REC_CHANGE, /* mst_type */ + iov, /* iov */ + ARRAY_SIZE(iov), /* iovlen */ + NULL, /* fds */ + 0); /* num_fds */ + if (!NT_STATUS_IS_OK(status)) { + DBG_DEBUG("messaging_send_iov failed: %s\n", + nt_errstr(status)); + return false; + } + + fcn_wait_cleanup(req, 0); /* fcn_wait_cleanup ignores req_state */ + tevent_req_defer_callback(req, state->ev); + tevent_req_nterror(req, NT_STATUS_CANCELLED); + + return true; +} + +static void fcn_wait_cleanup( + struct tevent_req *req, enum tevent_req_state req_state) +{ + struct fcn_wait_state *state = tevent_req_data( + req, struct fcn_wait_state); + TALLOC_FREE(state->recv_subreq); +} + +static bool fcn_wait_filter(struct messaging_rec *rec, void *private_data) +{ + struct tevent_req *req = talloc_get_type_abort( + private_data, struct tevent_req); + struct fcn_wait_state *state = tevent_req_data( + req, struct fcn_wait_state); + struct notify_event_msg msg = { .action = 0 }; + struct fcn_event *evt = NULL; + + if (rec->msg_type != MSG_PVFS_NOTIFY) { + DBG_DEBUG("Ignoring msg %"PRIu32"\n", rec->msg_type); + return false; + } + + /* + * We need at least the trailing '\0' for the path + */ + if (rec->buf.length < (offsetof(struct notify_event_msg, path) + 1)) { + DBG_DEBUG("Ignoring short (%zu) msg\n", rec->buf.length); + return false; + } + if (rec->buf.data[rec->buf.length-1] != '\0') { + DBG_DEBUG("Expected 0-terminated path\n"); + return false; + } + + memcpy(&msg, rec->buf.data, sizeof(msg)); + + if (msg.private_data != state) { + DBG_DEBUG("Got private_data=%p, expected %p\n", + msg.private_data, + state); + return false; + } + + evt = talloc_memdup(state, rec->buf.data, rec->buf.length); + if (evt == NULL) { + DBG_DEBUG("talloc_memdup failed\n"); + return false; + } + talloc_set_name_const(evt, "struct fcn_event"); + + /* + * TODO: Sort by timestamp + */ + + DLIST_ADD_END(state->events, evt); + + tevent_req_defer_callback(req, state->ev); + tevent_req_notify_callback(req); + + return false; +} + +static void fcn_wait_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + int ret; + + ret = messaging_filtered_read_recv(subreq, NULL, NULL); + TALLOC_FREE(subreq); + if (ret != 0) { + DBG_DEBUG("messaging_filtered_read failed: %s\n", + strerror(ret)); + tevent_req_nterror(req, map_nt_error_from_unix(ret)); + return; + } + + /* + * We should never have gotten here, all work is done from the + * filter function. + */ + tevent_req_nterror(req, NT_STATUS_INTERNAL_ERROR); +} + +NTSTATUS fcn_wait_recv( + struct tevent_req *req, + TALLOC_CTX *mem_ctx, + struct timespec *when, + uint32_t *action, + char **path) +{ + struct fcn_wait_state *state = tevent_req_data( + req, struct fcn_wait_state); + struct fcn_event *evt = NULL; + NTSTATUS status; + + if (!tevent_req_is_in_progress(req) && + tevent_req_is_nterror(req, &status)) { + return status; + } + evt = state->events; + if (evt == NULL) { + return NT_STATUS_RETRY; + } + + if (path != NULL) { + *path = talloc_strdup(mem_ctx, evt->msg.path); + if ((*path) == NULL) { + return NT_STATUS_NO_MEMORY; + } + } + if (when != NULL) { + *when = evt->msg.when; + } + if (action != NULL) { + *action = evt->msg.action; + } + + DLIST_REMOVE(state->events, evt); + + if (state->events != NULL) { + tevent_req_defer_callback(req, state->ev); + tevent_req_notify_callback(req); + } + + return NT_STATUS_OK; +} diff --git a/source3/smbd/notifyd/fcn_wait.h b/source3/smbd/notifyd/fcn_wait.h new file mode 100644 index 0000000..daadee4 --- /dev/null +++ b/source3/smbd/notifyd/fcn_wait.h @@ -0,0 +1,38 @@ +/* + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#ifndef __NOTIFYD_FCN_WAIT_H__ +#define __NOTIFYD_FCN_WAIT_H__ + +#include "replace.h" +#include "messages.h" +#include "librpc/gen_ndr/server_id.h" + +struct tevent_req *fcn_wait_send( + TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct messaging_context *msg_ctx, + struct server_id notifyd, + const char *path, + uint32_t filter, + uint32_t subdir_filter); +NTSTATUS fcn_wait_recv( + struct tevent_req *req, + TALLOC_CTX *mem_ctx, + struct timespec *when, + uint32_t *action, + char **path); + +#endif diff --git a/source3/smbd/notifyd/notifyd.c b/source3/smbd/notifyd/notifyd.c new file mode 100644 index 0000000..475c8e9 --- /dev/null +++ b/source3/smbd/notifyd/notifyd.c @@ -0,0 +1,1428 @@ +/* + * Unix SMB/CIFS implementation. + * + * Copyright (C) Volker Lendecke 2014 + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "replace.h" +#include <tevent.h> +#include "notifyd_private.h" +#include "lib/util/server_id.h" +#include "lib/util/data_blob.h" +#include "librpc/gen_ndr/notify.h" +#include "librpc/gen_ndr/messaging.h" +#include "librpc/gen_ndr/server_id.h" +#include "lib/dbwrap/dbwrap.h" +#include "lib/dbwrap/dbwrap_rbt.h" +#include "messages.h" +#include "tdb.h" +#include "util_tdb.h" +#include "notifyd.h" +#include "lib/util/server_id_db.h" +#include "lib/util/tevent_unix.h" +#include "lib/util/tevent_ntstatus.h" +#include "ctdbd_conn.h" +#include "ctdb_srvids.h" +#include "server_id_db_util.h" +#include "lib/util/iov_buf.h" +#include "messages_util.h" + +#ifdef CLUSTER_SUPPORT +#include "ctdb_protocol.h" +#endif + +struct notifyd_peer; + +/* + * All of notifyd's state + */ + +struct notifyd_state { + struct tevent_context *ev; + struct messaging_context *msg_ctx; + struct ctdbd_connection *ctdbd_conn; + + /* + * Database of everything clients show interest in. Indexed by + * absolute path. The database keys are not 0-terminated + * to allow the criticial operation, notifyd_trigger, to walk + * the structure from the top without adding intermediate 0s. + * The database records contain an array of + * + * struct notifyd_instance + * + * to be maintained and parsed by notifyd_parse_entry() + */ + struct db_context *entries; + + /* + * In the cluster case, this is the place where we store a log + * of all MSG_SMB_NOTIFY_REC_CHANGE messages. We just 1:1 + * forward them to our peer notifyd's in the cluster once a + * second or when the log grows too large. + */ + + struct messaging_reclog *log; + + /* + * Array of companion notifyd's in a cluster. Every notifyd + * broadcasts its messaging_reclog to every other notifyd in + * the cluster. This is done by making ctdb send a message to + * srvid CTDB_SRVID_SAMBA_NOTIFY_PROXY with destination node + * number CTDB_BROADCAST_CONNECTED. Everybody in the cluster who + * had called register_with_ctdbd this srvid will receive the + * broadcasts. + * + * Database replication happens via these broadcasts. Also, + * they serve as liveness indication. If a notifyd receives a + * broadcast from an unknown peer, it will create one for this + * srvid. Also when we don't hear anything from a peer for a + * while, we will discard it. + */ + + struct notifyd_peer **peers; + size_t num_peers; + + sys_notify_watch_fn sys_notify_watch; + struct sys_notify_context *sys_notify_ctx; +}; + +struct notifyd_peer { + struct notifyd_state *state; + struct server_id pid; + uint64_t rec_index; + struct db_context *db; + time_t last_broadcast; +}; + +static void notifyd_rec_change(struct messaging_context *msg_ctx, + void *private_data, uint32_t msg_type, + struct server_id src, DATA_BLOB *data); +static void notifyd_trigger(struct messaging_context *msg_ctx, + void *private_data, uint32_t msg_type, + struct server_id src, DATA_BLOB *data); +static void notifyd_get_db(struct messaging_context *msg_ctx, + void *private_data, uint32_t msg_type, + struct server_id src, DATA_BLOB *data); + +#ifdef CLUSTER_SUPPORT +static void notifyd_got_db(struct messaging_context *msg_ctx, + void *private_data, uint32_t msg_type, + struct server_id src, DATA_BLOB *data); +static void notifyd_broadcast_reclog(struct ctdbd_connection *ctdbd_conn, + struct server_id src, + struct messaging_reclog *log); +#endif +static void notifyd_sys_callback(struct sys_notify_context *ctx, + void *private_data, struct notify_event *ev, + uint32_t filter); + +#ifdef CLUSTER_SUPPORT +static struct tevent_req *notifyd_broadcast_reclog_send( + TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdbd_connection *ctdbd_conn, struct server_id src, + struct messaging_reclog *log); +static int notifyd_broadcast_reclog_recv(struct tevent_req *req); + +static struct tevent_req *notifyd_clean_peers_send( + TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct notifyd_state *notifyd); +static int notifyd_clean_peers_recv(struct tevent_req *req); +#endif + +static int sys_notify_watch_dummy( + TALLOC_CTX *mem_ctx, + struct sys_notify_context *ctx, + const char *path, + uint32_t *filter, + uint32_t *subdir_filter, + void (*callback)(struct sys_notify_context *ctx, + void *private_data, + struct notify_event *ev, + uint32_t filter), + void *private_data, + void *handle_p) +{ + void **handle = handle_p; + *handle = NULL; + return 0; +} + +#ifdef CLUSTER_SUPPORT +static void notifyd_broadcast_reclog_finished(struct tevent_req *subreq); +static void notifyd_clean_peers_finished(struct tevent_req *subreq); +static int notifyd_snoop_broadcast(struct tevent_context *ev, + uint32_t src_vnn, uint32_t dst_vnn, + uint64_t dst_srvid, + const uint8_t *msg, size_t msglen, + void *private_data); +#endif + +struct tevent_req *notifyd_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct messaging_context *msg_ctx, + struct ctdbd_connection *ctdbd_conn, + sys_notify_watch_fn sys_notify_watch, + struct sys_notify_context *sys_notify_ctx) +{ + struct tevent_req *req; +#ifdef CLUSTER_SUPPORT + struct tevent_req *subreq; +#endif + struct notifyd_state *state; + struct server_id_db *names_db; + NTSTATUS status; + int ret; + + req = tevent_req_create(mem_ctx, &state, struct notifyd_state); + if (req == NULL) { + return NULL; + } + state->ev = ev; + state->msg_ctx = msg_ctx; + state->ctdbd_conn = ctdbd_conn; + + if (sys_notify_watch == NULL) { + sys_notify_watch = sys_notify_watch_dummy; + } + + state->sys_notify_watch = sys_notify_watch; + state->sys_notify_ctx = sys_notify_ctx; + + state->entries = db_open_rbt(state); + if (tevent_req_nomem(state->entries, req)) { + return tevent_req_post(req, ev); + } + + status = messaging_register(msg_ctx, state, MSG_SMB_NOTIFY_REC_CHANGE, + notifyd_rec_change); + if (tevent_req_nterror(req, status)) { + return tevent_req_post(req, ev); + } + + status = messaging_register(msg_ctx, state, MSG_SMB_NOTIFY_TRIGGER, + notifyd_trigger); + if (tevent_req_nterror(req, status)) { + goto deregister_rec_change; + } + + status = messaging_register(msg_ctx, state, MSG_SMB_NOTIFY_GET_DB, + notifyd_get_db); + if (tevent_req_nterror(req, status)) { + goto deregister_trigger; + } + + names_db = messaging_names_db(msg_ctx); + + ret = server_id_db_set_exclusive(names_db, "notify-daemon"); + if (ret != 0) { + DBG_DEBUG("server_id_db_add failed: %s\n", + strerror(ret)); + tevent_req_error(req, ret); + goto deregister_get_db; + } + + if (ctdbd_conn == NULL) { + /* + * No cluster around, skip the database replication + * engine + */ + return req; + } + +#ifdef CLUSTER_SUPPORT + status = messaging_register(msg_ctx, state, MSG_SMB_NOTIFY_DB, + notifyd_got_db); + if (tevent_req_nterror(req, status)) { + goto deregister_get_db; + } + + state->log = talloc_zero(state, struct messaging_reclog); + if (tevent_req_nomem(state->log, req)) { + goto deregister_db; + } + + subreq = notifyd_broadcast_reclog_send( + state->log, ev, ctdbd_conn, + messaging_server_id(msg_ctx), + state->log); + if (tevent_req_nomem(subreq, req)) { + goto deregister_db; + } + tevent_req_set_callback(subreq, + notifyd_broadcast_reclog_finished, + req); + + subreq = notifyd_clean_peers_send(state, ev, state); + if (tevent_req_nomem(subreq, req)) { + goto deregister_db; + } + tevent_req_set_callback(subreq, notifyd_clean_peers_finished, + req); + + ret = register_with_ctdbd(ctdbd_conn, + CTDB_SRVID_SAMBA_NOTIFY_PROXY, + notifyd_snoop_broadcast, state); + if (ret != 0) { + tevent_req_error(req, ret); + goto deregister_db; + } +#endif + + return req; + +#ifdef CLUSTER_SUPPORT +deregister_db: + messaging_deregister(msg_ctx, MSG_SMB_NOTIFY_DB, state); +#endif +deregister_get_db: + messaging_deregister(msg_ctx, MSG_SMB_NOTIFY_GET_DB, state); +deregister_trigger: + messaging_deregister(msg_ctx, MSG_SMB_NOTIFY_TRIGGER, state); +deregister_rec_change: + messaging_deregister(msg_ctx, MSG_SMB_NOTIFY_REC_CHANGE, state); + return tevent_req_post(req, ev); +} + +#ifdef CLUSTER_SUPPORT + +static void notifyd_broadcast_reclog_finished(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + int ret; + + ret = notifyd_broadcast_reclog_recv(subreq); + TALLOC_FREE(subreq); + tevent_req_error(req, ret); +} + +static void notifyd_clean_peers_finished(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + int ret; + + ret = notifyd_clean_peers_recv(subreq); + TALLOC_FREE(subreq); + tevent_req_error(req, ret); +} + +#endif + +int notifyd_recv(struct tevent_req *req) +{ + return tevent_req_simple_recv_unix(req); +} + +static bool notifyd_apply_rec_change( + const struct server_id *client, + const char *path, size_t pathlen, + const struct notify_instance *chg, + struct db_context *entries, + sys_notify_watch_fn sys_notify_watch, + struct sys_notify_context *sys_notify_ctx, + struct messaging_context *msg_ctx) +{ + struct db_record *rec = NULL; + struct notifyd_instance *instances = NULL; + size_t num_instances; + size_t i; + struct notifyd_instance *instance = NULL; + TDB_DATA value; + NTSTATUS status; + bool ok = false; + + if (pathlen == 0) { + DBG_WARNING("pathlen==0\n"); + return false; + } + if (path[pathlen-1] != '\0') { + DBG_WARNING("path not 0-terminated\n"); + return false; + } + + DBG_DEBUG("path=%s, filter=%"PRIu32", subdir_filter=%"PRIu32", " + "private_data=%p\n", + path, + chg->filter, + chg->subdir_filter, + chg->private_data); + + rec = dbwrap_fetch_locked( + entries, entries, + make_tdb_data((const uint8_t *)path, pathlen-1)); + + if (rec == NULL) { + DBG_WARNING("dbwrap_fetch_locked failed\n"); + goto fail; + } + + num_instances = 0; + value = dbwrap_record_get_value(rec); + + if (value.dsize != 0) { + if (!notifyd_parse_entry(value.dptr, value.dsize, NULL, + &num_instances)) { + goto fail; + } + } + + /* + * Overallocate by one instance to avoid a realloc when adding + */ + instances = talloc_array(rec, struct notifyd_instance, + num_instances + 1); + if (instances == NULL) { + DBG_WARNING("talloc failed\n"); + goto fail; + } + + if (value.dsize != 0) { + memcpy(instances, value.dptr, value.dsize); + } + + for (i=0; i<num_instances; i++) { + instance = &instances[i]; + + if (server_id_equal(&instance->client, client) && + (instance->instance.private_data == chg->private_data)) { + break; + } + } + + if (i < num_instances) { + instance->instance = *chg; + } else { + /* + * We've overallocated for one instance + */ + instance = &instances[num_instances]; + + *instance = (struct notifyd_instance) { + .client = *client, + .instance = *chg, + .internal_filter = chg->filter, + .internal_subdir_filter = chg->subdir_filter + }; + + num_instances += 1; + } + + if ((instance->instance.filter != 0) || + (instance->instance.subdir_filter != 0)) { + int ret; + + TALLOC_FREE(instance->sys_watch); + + ret = sys_notify_watch(entries, sys_notify_ctx, path, + &instance->internal_filter, + &instance->internal_subdir_filter, + notifyd_sys_callback, msg_ctx, + &instance->sys_watch); + if (ret != 0) { + DBG_WARNING("sys_notify_watch for [%s] returned %s\n", + path, strerror(errno)); + } + } + + if ((instance->instance.filter == 0) && + (instance->instance.subdir_filter == 0)) { + /* This is a delete request */ + TALLOC_FREE(instance->sys_watch); + *instance = instances[num_instances-1]; + num_instances -= 1; + } + + DBG_DEBUG("%s has %zu instances\n", path, num_instances); + + if (num_instances == 0) { + status = dbwrap_record_delete(rec); + if (!NT_STATUS_IS_OK(status)) { + DBG_WARNING("dbwrap_record_delete returned %s\n", + nt_errstr(status)); + goto fail; + } + } else { + value = make_tdb_data( + (uint8_t *)instances, + sizeof(struct notifyd_instance) * num_instances); + + status = dbwrap_record_store(rec, value, 0); + if (!NT_STATUS_IS_OK(status)) { + DBG_WARNING("dbwrap_record_store returned %s\n", + nt_errstr(status)); + goto fail; + } + } + + ok = true; +fail: + TALLOC_FREE(rec); + return ok; +} + +static void notifyd_sys_callback(struct sys_notify_context *ctx, + void *private_data, struct notify_event *ev, + uint32_t filter) +{ + struct messaging_context *msg_ctx = talloc_get_type_abort( + private_data, struct messaging_context); + struct notify_trigger_msg msg; + struct iovec iov[4]; + char slash = '/'; + + msg = (struct notify_trigger_msg) { + .when = timespec_current(), + .action = ev->action, + .filter = filter, + }; + + iov[0].iov_base = &msg; + iov[0].iov_len = offsetof(struct notify_trigger_msg, path); + iov[1].iov_base = discard_const_p(char, ev->dir); + iov[1].iov_len = strlen(ev->dir); + iov[2].iov_base = &slash; + iov[2].iov_len = 1; + iov[3].iov_base = discard_const_p(char, ev->path); + iov[3].iov_len = strlen(ev->path)+1; + + messaging_send_iov( + msg_ctx, messaging_server_id(msg_ctx), + MSG_SMB_NOTIFY_TRIGGER, iov, ARRAY_SIZE(iov), NULL, 0); +} + +static bool notifyd_parse_rec_change(uint8_t *buf, size_t bufsize, + struct notify_rec_change_msg **pmsg, + size_t *pathlen) +{ + struct notify_rec_change_msg *msg; + + if (bufsize < offsetof(struct notify_rec_change_msg, path) + 1) { + DBG_WARNING("message too short, ignoring: %zu\n", bufsize); + return false; + } + + *pmsg = msg = (struct notify_rec_change_msg *)buf; + *pathlen = bufsize - offsetof(struct notify_rec_change_msg, path); + + DBG_DEBUG("Got rec_change_msg filter=%"PRIu32", " + "subdir_filter=%"PRIu32", private_data=%p, path=%.*s\n", + msg->instance.filter, + msg->instance.subdir_filter, + msg->instance.private_data, + (int)(*pathlen), + msg->path); + + return true; +} + +static void notifyd_rec_change(struct messaging_context *msg_ctx, + void *private_data, uint32_t msg_type, + struct server_id src, DATA_BLOB *data) +{ + struct notifyd_state *state = talloc_get_type_abort( + private_data, struct notifyd_state); + struct server_id_buf idbuf; + struct notify_rec_change_msg *msg; + size_t pathlen; + bool ok; + struct notify_instance instance; + + DBG_DEBUG("Got %zu bytes from %s\n", data->length, + server_id_str_buf(src, &idbuf)); + + ok = notifyd_parse_rec_change(data->data, data->length, + &msg, &pathlen); + if (!ok) { + return; + } + + memcpy(&instance, &msg->instance, sizeof(instance)); /* avoid SIGBUS */ + + ok = notifyd_apply_rec_change( + &src, msg->path, pathlen, &instance, + state->entries, state->sys_notify_watch, state->sys_notify_ctx, + state->msg_ctx); + if (!ok) { + DBG_DEBUG("notifyd_apply_rec_change failed, ignoring\n"); + return; + } + + if ((state->log == NULL) || (state->ctdbd_conn == NULL)) { + return; + } + +#ifdef CLUSTER_SUPPORT + { + + struct messaging_rec **tmp; + struct messaging_reclog *log; + struct iovec iov = { .iov_base = data->data, .iov_len = data->length }; + + log = state->log; + + tmp = talloc_realloc(log, log->recs, struct messaging_rec *, + log->num_recs+1); + if (tmp == NULL) { + DBG_WARNING("talloc_realloc failed, ignoring\n"); + return; + } + log->recs = tmp; + + log->recs[log->num_recs] = messaging_rec_create( + log->recs, src, messaging_server_id(msg_ctx), + msg_type, &iov, 1, NULL, 0); + + if (log->recs[log->num_recs] == NULL) { + DBG_WARNING("messaging_rec_create failed, ignoring\n"); + return; + } + + log->num_recs += 1; + + if (log->num_recs >= 100) { + /* + * Don't let the log grow too large + */ + notifyd_broadcast_reclog(state->ctdbd_conn, + messaging_server_id(msg_ctx), log); + } + + } +#endif +} + +struct notifyd_trigger_state { + struct messaging_context *msg_ctx; + struct notify_trigger_msg *msg; + bool recursive; + bool covered_by_sys_notify; +}; + +static void notifyd_trigger_parser(TDB_DATA key, TDB_DATA data, + void *private_data); + +static void notifyd_trigger(struct messaging_context *msg_ctx, + void *private_data, uint32_t msg_type, + struct server_id src, DATA_BLOB *data) +{ + struct notifyd_state *state = talloc_get_type_abort( + private_data, struct notifyd_state); + struct server_id my_id = messaging_server_id(msg_ctx); + struct notifyd_trigger_state tstate; + const char *path; + const char *p, *next_p; + + if (data->length < offsetof(struct notify_trigger_msg, path) + 1) { + DBG_WARNING("message too short, ignoring: %zu\n", + data->length); + return; + } + if (data->data[data->length-1] != 0) { + DBG_WARNING("path not 0-terminated, ignoring\n");; + return; + } + + tstate.msg_ctx = msg_ctx; + + tstate.covered_by_sys_notify = (src.vnn == my_id.vnn); + tstate.covered_by_sys_notify &= !server_id_equal(&src, &my_id); + + tstate.msg = (struct notify_trigger_msg *)data->data; + path = tstate.msg->path; + + DBG_DEBUG("Got trigger_msg action=%"PRIu32", filter=%"PRIu32", " + "path=%s\n", + tstate.msg->action, + tstate.msg->filter, + path); + + if (path[0] != '/') { + DBG_WARNING("path %s does not start with /, ignoring\n", + path); + return; + } + + for (p = strchr(path+1, '/'); p != NULL; p = next_p) { + ptrdiff_t path_len = p - path; + TDB_DATA key; + uint32_t i; + + next_p = strchr(p+1, '/'); + tstate.recursive = (next_p != NULL); + + DBG_DEBUG("Trying path %.*s\n", (int)path_len, path); + + key = (TDB_DATA) { .dptr = discard_const_p(uint8_t, path), + .dsize = path_len }; + + dbwrap_parse_record(state->entries, key, + notifyd_trigger_parser, &tstate); + + if (state->peers == NULL) { + continue; + } + + if (src.vnn != my_id.vnn) { + continue; + } + + for (i=0; i<state->num_peers; i++) { + if (state->peers[i]->db == NULL) { + /* + * Inactive peer, did not get a db yet + */ + continue; + } + dbwrap_parse_record(state->peers[i]->db, key, + notifyd_trigger_parser, &tstate); + } + } +} + +static void notifyd_send_delete(struct messaging_context *msg_ctx, + TDB_DATA key, + struct notifyd_instance *instance); + +static void notifyd_trigger_parser(TDB_DATA key, TDB_DATA data, + void *private_data) + +{ + struct notifyd_trigger_state *tstate = private_data; + struct notify_event_msg msg = { .action = tstate->msg->action, + .when = tstate->msg->when }; + struct iovec iov[2]; + size_t path_len = key.dsize; + struct notifyd_instance *instances = NULL; + size_t num_instances = 0; + size_t i; + + if (!notifyd_parse_entry(data.dptr, data.dsize, &instances, + &num_instances)) { + DBG_DEBUG("Could not parse notifyd_entry\n"); + return; + } + + DBG_DEBUG("Found %zu instances for %.*s\n", + num_instances, + (int)key.dsize, + (char *)key.dptr); + + iov[0].iov_base = &msg; + iov[0].iov_len = offsetof(struct notify_event_msg, path); + iov[1].iov_base = tstate->msg->path + path_len + 1; + iov[1].iov_len = strlen((char *)(iov[1].iov_base)) + 1; + + for (i=0; i<num_instances; i++) { + struct notifyd_instance *instance = &instances[i]; + struct server_id_buf idbuf; + uint32_t i_filter; + NTSTATUS status; + + if (tstate->covered_by_sys_notify) { + if (tstate->recursive) { + i_filter = instance->internal_subdir_filter; + } else { + i_filter = instance->internal_filter; + } + } else { + if (tstate->recursive) { + i_filter = instance->instance.subdir_filter; + } else { + i_filter = instance->instance.filter; + } + } + + if ((i_filter & tstate->msg->filter) == 0) { + continue; + } + + msg.private_data = instance->instance.private_data; + + status = messaging_send_iov( + tstate->msg_ctx, instance->client, + MSG_PVFS_NOTIFY, iov, ARRAY_SIZE(iov), NULL, 0); + + DBG_DEBUG("messaging_send_iov to %s returned %s\n", + server_id_str_buf(instance->client, &idbuf), + nt_errstr(status)); + + if (NT_STATUS_EQUAL(status, NT_STATUS_OBJECT_NAME_NOT_FOUND) && + procid_is_local(&instance->client)) { + /* + * That process has died + */ + notifyd_send_delete(tstate->msg_ctx, key, instance); + continue; + } + + if (!NT_STATUS_IS_OK(status)) { + DBG_WARNING("messaging_send_iov returned %s\n", + nt_errstr(status)); + } + } +} + +/* + * Send a delete request to ourselves to properly discard a notify + * record for an smbd that has died. + */ + +static void notifyd_send_delete(struct messaging_context *msg_ctx, + TDB_DATA key, + struct notifyd_instance *instance) +{ + struct notify_rec_change_msg msg = { + .instance.private_data = instance->instance.private_data + }; + uint8_t nul = 0; + struct iovec iov[3]; + int ret; + + /* + * Send a rec_change to ourselves to delete a dead entry + */ + + iov[0] = (struct iovec) { + .iov_base = &msg, + .iov_len = offsetof(struct notify_rec_change_msg, path) }; + iov[1] = (struct iovec) { .iov_base = key.dptr, .iov_len = key.dsize }; + iov[2] = (struct iovec) { .iov_base = &nul, .iov_len = sizeof(nul) }; + + ret = messaging_send_iov_from( + msg_ctx, instance->client, messaging_server_id(msg_ctx), + MSG_SMB_NOTIFY_REC_CHANGE, iov, ARRAY_SIZE(iov), NULL, 0); + + if (ret != 0) { + DBG_WARNING("messaging_send_iov_from returned %s\n", + strerror(ret)); + } +} + +static void notifyd_get_db(struct messaging_context *msg_ctx, + void *private_data, uint32_t msg_type, + struct server_id src, DATA_BLOB *data) +{ + struct notifyd_state *state = talloc_get_type_abort( + private_data, struct notifyd_state); + struct server_id_buf id1, id2; + NTSTATUS status; + uint64_t rec_index = UINT64_MAX; + uint8_t index_buf[sizeof(uint64_t)]; + size_t dbsize; + uint8_t *buf; + struct iovec iov[2]; + + dbsize = dbwrap_marshall(state->entries, NULL, 0); + + buf = talloc_array(talloc_tos(), uint8_t, dbsize); + if (buf == NULL) { + DBG_WARNING("talloc_array(%zu) failed\n", dbsize); + return; + } + + dbsize = dbwrap_marshall(state->entries, buf, dbsize); + + if (dbsize != talloc_get_size(buf)) { + DBG_DEBUG("dbsize changed: %zu->%zu\n", + talloc_get_size(buf), + dbsize); + TALLOC_FREE(buf); + return; + } + + if (state->log != NULL) { + rec_index = state->log->rec_index; + } + SBVAL(index_buf, 0, rec_index); + + iov[0] = (struct iovec) { .iov_base = index_buf, + .iov_len = sizeof(index_buf) }; + iov[1] = (struct iovec) { .iov_base = buf, + .iov_len = dbsize }; + + DBG_DEBUG("Sending %zu bytes to %s->%s\n", + iov_buflen(iov, ARRAY_SIZE(iov)), + server_id_str_buf(messaging_server_id(msg_ctx), &id1), + server_id_str_buf(src, &id2)); + + status = messaging_send_iov(msg_ctx, src, MSG_SMB_NOTIFY_DB, + iov, ARRAY_SIZE(iov), NULL, 0); + TALLOC_FREE(buf); + if (!NT_STATUS_IS_OK(status)) { + DBG_WARNING("messaging_send_iov failed: %s\n", + nt_errstr(status)); + } +} + +#ifdef CLUSTER_SUPPORT + +static int notifyd_add_proxy_syswatches(struct db_record *rec, + void *private_data); + +static void notifyd_got_db(struct messaging_context *msg_ctx, + void *private_data, uint32_t msg_type, + struct server_id src, DATA_BLOB *data) +{ + struct notifyd_state *state = talloc_get_type_abort( + private_data, struct notifyd_state); + struct notifyd_peer *p = NULL; + struct server_id_buf idbuf; + NTSTATUS status; + int count; + size_t i; + + for (i=0; i<state->num_peers; i++) { + if (server_id_equal(&src, &state->peers[i]->pid)) { + p = state->peers[i]; + break; + } + } + + if (p == NULL) { + DBG_DEBUG("Did not find peer for db from %s\n", + server_id_str_buf(src, &idbuf)); + return; + } + + if (data->length < 8) { + DBG_DEBUG("Got short db length %zu from %s\n", data->length, + server_id_str_buf(src, &idbuf)); + TALLOC_FREE(p); + return; + } + + p->rec_index = BVAL(data->data, 0); + + p->db = db_open_rbt(p); + if (p->db == NULL) { + DBG_DEBUG("db_open_rbt failed\n"); + TALLOC_FREE(p); + return; + } + + status = dbwrap_unmarshall(p->db, data->data + 8, + data->length - 8); + if (!NT_STATUS_IS_OK(status)) { + DBG_DEBUG("dbwrap_unmarshall returned %s for db %s\n", + nt_errstr(status), + server_id_str_buf(src, &idbuf)); + TALLOC_FREE(p); + return; + } + + dbwrap_traverse_read(p->db, notifyd_add_proxy_syswatches, state, + &count); + + DBG_DEBUG("Database from %s contained %d records\n", + server_id_str_buf(src, &idbuf), + count); +} + +static void notifyd_broadcast_reclog(struct ctdbd_connection *ctdbd_conn, + struct server_id src, + struct messaging_reclog *log) +{ + enum ndr_err_code ndr_err; + uint8_t msghdr[MESSAGE_HDR_LENGTH]; + DATA_BLOB blob; + struct iovec iov[2]; + int ret; + + if (log == NULL) { + return; + } + + DBG_DEBUG("rec_index=%"PRIu64", num_recs=%"PRIu32"\n", + log->rec_index, + log->num_recs); + + message_hdr_put(msghdr, MSG_SMB_NOTIFY_REC_CHANGES, src, + (struct server_id) {0 }); + iov[0] = (struct iovec) { .iov_base = msghdr, + .iov_len = sizeof(msghdr) }; + + ndr_err = ndr_push_struct_blob( + &blob, log, log, + (ndr_push_flags_fn_t)ndr_push_messaging_reclog); + if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) { + DBG_WARNING("ndr_push_messaging_recs failed: %s\n", + ndr_errstr(ndr_err)); + goto done; + } + iov[1] = (struct iovec) { .iov_base = blob.data, + .iov_len = blob.length }; + + ret = ctdbd_messaging_send_iov( + ctdbd_conn, CTDB_BROADCAST_CONNECTED, + CTDB_SRVID_SAMBA_NOTIFY_PROXY, iov, ARRAY_SIZE(iov)); + TALLOC_FREE(blob.data); + if (ret != 0) { + DBG_WARNING("ctdbd_messaging_send failed: %s\n", + strerror(ret)); + goto done; + } + + log->rec_index += 1; + +done: + log->num_recs = 0; + TALLOC_FREE(log->recs); +} + +struct notifyd_broadcast_reclog_state { + struct tevent_context *ev; + struct ctdbd_connection *ctdbd_conn; + struct server_id src; + struct messaging_reclog *log; +}; + +static void notifyd_broadcast_reclog_next(struct tevent_req *subreq); + +static struct tevent_req *notifyd_broadcast_reclog_send( + TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdbd_connection *ctdbd_conn, struct server_id src, + struct messaging_reclog *log) +{ + struct tevent_req *req, *subreq; + struct notifyd_broadcast_reclog_state *state; + + req = tevent_req_create(mem_ctx, &state, + struct notifyd_broadcast_reclog_state); + if (req == NULL) { + return NULL; + } + state->ev = ev; + state->ctdbd_conn = ctdbd_conn; + state->src = src; + state->log = log; + + subreq = tevent_wakeup_send(state, state->ev, + timeval_current_ofs_msec(1000)); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, notifyd_broadcast_reclog_next, req); + return req; +} + +static void notifyd_broadcast_reclog_next(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct notifyd_broadcast_reclog_state *state = tevent_req_data( + req, struct notifyd_broadcast_reclog_state); + bool ok; + + ok = tevent_wakeup_recv(subreq); + TALLOC_FREE(subreq); + if (!ok) { + tevent_req_oom(req); + return; + } + + notifyd_broadcast_reclog(state->ctdbd_conn, state->src, state->log); + + subreq = tevent_wakeup_send(state, state->ev, + timeval_current_ofs_msec(1000)); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, notifyd_broadcast_reclog_next, req); +} + +static int notifyd_broadcast_reclog_recv(struct tevent_req *req) +{ + return tevent_req_simple_recv_unix(req); +} + +struct notifyd_clean_peers_state { + struct tevent_context *ev; + struct notifyd_state *notifyd; +}; + +static void notifyd_clean_peers_next(struct tevent_req *subreq); + +static struct tevent_req *notifyd_clean_peers_send( + TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct notifyd_state *notifyd) +{ + struct tevent_req *req, *subreq; + struct notifyd_clean_peers_state *state; + + req = tevent_req_create(mem_ctx, &state, + struct notifyd_clean_peers_state); + if (req == NULL) { + return NULL; + } + state->ev = ev; + state->notifyd = notifyd; + + subreq = tevent_wakeup_send(state, state->ev, + timeval_current_ofs_msec(30000)); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, notifyd_clean_peers_next, req); + return req; +} + +static void notifyd_clean_peers_next(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct notifyd_clean_peers_state *state = tevent_req_data( + req, struct notifyd_clean_peers_state); + struct notifyd_state *notifyd = state->notifyd; + size_t i; + bool ok; + time_t now = time(NULL); + + ok = tevent_wakeup_recv(subreq); + TALLOC_FREE(subreq); + if (!ok) { + tevent_req_oom(req); + return; + } + + i = 0; + while (i < notifyd->num_peers) { + struct notifyd_peer *p = notifyd->peers[i]; + + if ((now - p->last_broadcast) > 60) { + struct server_id_buf idbuf; + + /* + * Haven't heard for more than 60 seconds. Call this + * peer dead + */ + + DBG_DEBUG("peer %s died\n", + server_id_str_buf(p->pid, &idbuf)); + /* + * This implicitly decrements notifyd->num_peers + */ + TALLOC_FREE(p); + } else { + i += 1; + } + } + + subreq = tevent_wakeup_send(state, state->ev, + timeval_current_ofs_msec(30000)); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, notifyd_clean_peers_next, req); +} + +static int notifyd_clean_peers_recv(struct tevent_req *req) +{ + return tevent_req_simple_recv_unix(req); +} + +static int notifyd_add_proxy_syswatches(struct db_record *rec, + void *private_data) +{ + struct notifyd_state *state = talloc_get_type_abort( + private_data, struct notifyd_state); + struct db_context *db = dbwrap_record_get_db(rec); + TDB_DATA key = dbwrap_record_get_key(rec); + TDB_DATA value = dbwrap_record_get_value(rec); + struct notifyd_instance *instances = NULL; + size_t num_instances = 0; + size_t i; + char path[key.dsize+1]; + bool ok; + + memcpy(path, key.dptr, key.dsize); + path[key.dsize] = '\0'; + + ok = notifyd_parse_entry(value.dptr, value.dsize, &instances, + &num_instances); + if (!ok) { + DBG_WARNING("Could not parse notifyd entry for %s\n", path); + return 0; + } + + for (i=0; i<num_instances; i++) { + struct notifyd_instance *instance = &instances[i]; + uint32_t filter = instance->instance.filter; + uint32_t subdir_filter = instance->instance.subdir_filter; + int ret; + + /* + * This is a remote database. Pointers that we were + * given don't make sense locally. Initialize to NULL + * in case sys_notify_watch fails. + */ + instances[i].sys_watch = NULL; + + ret = state->sys_notify_watch( + db, state->sys_notify_ctx, path, + &filter, &subdir_filter, + notifyd_sys_callback, state->msg_ctx, + &instance->sys_watch); + if (ret != 0) { + DBG_WARNING("inotify_watch returned %s\n", + strerror(errno)); + } + } + + return 0; +} + +static int notifyd_db_del_syswatches(struct db_record *rec, void *private_data) +{ + TDB_DATA key = dbwrap_record_get_key(rec); + TDB_DATA value = dbwrap_record_get_value(rec); + struct notifyd_instance *instances = NULL; + size_t num_instances = 0; + size_t i; + bool ok; + + ok = notifyd_parse_entry(value.dptr, value.dsize, &instances, + &num_instances); + if (!ok) { + DBG_WARNING("Could not parse notifyd entry for %.*s\n", + (int)key.dsize, (char *)key.dptr); + return 0; + } + for (i=0; i<num_instances; i++) { + TALLOC_FREE(instances[i].sys_watch); + } + return 0; +} + +static int notifyd_peer_destructor(struct notifyd_peer *p) +{ + struct notifyd_state *state = p->state; + size_t i; + + if (p->db != NULL) { + dbwrap_traverse_read(p->db, notifyd_db_del_syswatches, + NULL, NULL); + } + + for (i = 0; i<state->num_peers; i++) { + if (p == state->peers[i]) { + state->peers[i] = state->peers[state->num_peers-1]; + state->num_peers -= 1; + break; + } + } + return 0; +} + +static struct notifyd_peer *notifyd_peer_new( + struct notifyd_state *state, struct server_id pid) +{ + struct notifyd_peer *p, **tmp; + + tmp = talloc_realloc(state, state->peers, struct notifyd_peer *, + state->num_peers+1); + if (tmp == NULL) { + return NULL; + } + state->peers = tmp; + + p = talloc_zero(state->peers, struct notifyd_peer); + if (p == NULL) { + return NULL; + } + p->state = state; + p->pid = pid; + + state->peers[state->num_peers] = p; + state->num_peers += 1; + + talloc_set_destructor(p, notifyd_peer_destructor); + + return p; +} + +static void notifyd_apply_reclog(struct notifyd_peer *peer, + const uint8_t *msg, size_t msglen) +{ + struct notifyd_state *state = peer->state; + DATA_BLOB blob = { .data = discard_const_p(uint8_t, msg), + .length = msglen }; + struct server_id_buf idbuf; + struct messaging_reclog *log; + enum ndr_err_code ndr_err; + uint32_t i; + + if (peer->db == NULL) { + /* + * No db yet + */ + return; + } + + log = talloc(peer, struct messaging_reclog); + if (log == NULL) { + DBG_DEBUG("talloc failed\n"); + return; + } + + ndr_err = ndr_pull_struct_blob_all( + &blob, log, log, + (ndr_pull_flags_fn_t)ndr_pull_messaging_reclog); + if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) { + DBG_DEBUG("ndr_pull_messaging_reclog failed: %s\n", + ndr_errstr(ndr_err)); + goto fail; + } + + DBG_DEBUG("Got %"PRIu32" recs index %"PRIu64" from %s\n", + log->num_recs, + log->rec_index, + server_id_str_buf(peer->pid, &idbuf)); + + if (log->rec_index != peer->rec_index) { + DBG_INFO("Got rec index %"PRIu64" from %s, " + "expected %"PRIu64"\n", + log->rec_index, + server_id_str_buf(peer->pid, &idbuf), + peer->rec_index); + goto fail; + } + + for (i=0; i<log->num_recs; i++) { + struct messaging_rec *r = log->recs[i]; + struct notify_rec_change_msg *chg; + size_t pathlen; + bool ok; + struct notify_instance instance; + + ok = notifyd_parse_rec_change(r->buf.data, r->buf.length, + &chg, &pathlen); + if (!ok) { + DBG_INFO("notifyd_parse_rec_change failed\n"); + goto fail; + } + + /* avoid SIGBUS */ + memcpy(&instance, &chg->instance, sizeof(instance)); + + ok = notifyd_apply_rec_change(&r->src, chg->path, pathlen, + &instance, peer->db, + state->sys_notify_watch, + state->sys_notify_ctx, + state->msg_ctx); + if (!ok) { + DBG_INFO("notifyd_apply_rec_change failed\n"); + goto fail; + } + } + + peer->rec_index += 1; + peer->last_broadcast = time(NULL); + + TALLOC_FREE(log); + return; + +fail: + DBG_DEBUG("Dropping peer %s\n", + server_id_str_buf(peer->pid, &idbuf)); + TALLOC_FREE(peer); +} + +/* + * Receive messaging_reclog (log of MSG_SMB_NOTIFY_REC_CHANGE + * messages) broadcasts by other notifyds. Several cases: + * + * We don't know the source. This creates a new peer. Creating a peer + * involves asking the peer for its full database. We assume ordered + * messages, so the new database will arrive before the next broadcast + * will. + * + * We know the source and the log index matches. We will apply the log + * locally to our peer's db as if we had received it from a local + * client. + * + * We know the source but the log index does not match. This means we + * lost a message. We just drop the whole peer and wait for the next + * broadcast, which will then trigger a fresh database pull. + */ + +static int notifyd_snoop_broadcast(struct tevent_context *ev, + uint32_t src_vnn, uint32_t dst_vnn, + uint64_t dst_srvid, + const uint8_t *msg, size_t msglen, + void *private_data) +{ + struct notifyd_state *state = talloc_get_type_abort( + private_data, struct notifyd_state); + struct server_id my_id = messaging_server_id(state->msg_ctx); + struct notifyd_peer *p; + uint32_t i; + uint32_t msg_type; + struct server_id src, dst; + struct server_id_buf idbuf; + NTSTATUS status; + + if (msglen < MESSAGE_HDR_LENGTH) { + DBG_DEBUG("Got short broadcast\n"); + return 0; + } + message_hdr_get(&msg_type, &src, &dst, msg); + + if (msg_type != MSG_SMB_NOTIFY_REC_CHANGES) { + DBG_DEBUG("Got message %"PRIu32", ignoring\n", msg_type); + return 0; + } + if (server_id_equal(&src, &my_id)) { + DBG_DEBUG("Ignoring my own broadcast\n"); + return 0; + } + + DBG_DEBUG("Got MSG_SMB_NOTIFY_REC_CHANGES from %s\n", + server_id_str_buf(src, &idbuf)); + + for (i=0; i<state->num_peers; i++) { + if (server_id_equal(&state->peers[i]->pid, &src)) { + + DBG_DEBUG("Applying changes to peer %"PRIu32"\n", i); + + notifyd_apply_reclog(state->peers[i], + msg + MESSAGE_HDR_LENGTH, + msglen - MESSAGE_HDR_LENGTH); + return 0; + } + } + + DBG_DEBUG("Creating new peer for %s\n", + server_id_str_buf(src, &idbuf)); + + p = notifyd_peer_new(state, src); + if (p == NULL) { + DBG_DEBUG("notifyd_peer_new failed\n"); + return 0; + } + + status = messaging_send_buf(state->msg_ctx, src, MSG_SMB_NOTIFY_GET_DB, + NULL, 0); + if (!NT_STATUS_IS_OK(status)) { + DBG_DEBUG("messaging_send_buf failed: %s\n", + nt_errstr(status)); + TALLOC_FREE(p); + return 0; + } + + return 0; +} +#endif diff --git a/source3/smbd/notifyd/notifyd.h b/source3/smbd/notifyd/notifyd.h new file mode 100644 index 0000000..5ef8c4c --- /dev/null +++ b/source3/smbd/notifyd/notifyd.h @@ -0,0 +1,145 @@ +/* + * Unix SMB/CIFS implementation. + * + * Copyright (C) Volker Lendecke 2014 + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#ifndef __NOTIFYD_NOTIFYD_H__ +#define __NOTIFYD_NOTIFYD_H__ + +#include "includes.h" +#include "librpc/gen_ndr/notify.h" +#include "librpc/gen_ndr/messaging.h" +#include "lib/dbwrap/dbwrap.h" +#include "lib/dbwrap/dbwrap_rbt.h" +#include "messages.h" +#include "tdb.h" +#include "util_tdb.h" + +/* + * Filechangenotify based on asynchronous messages + * + * smbds talk to local notify daemons to inform them about paths they are + * interested in. They also tell local notify daemons about changes they have + * done to the file system. There's two message types from smbd to + * notifyd. The first is used to inform notifyd about changes in notify + * interest. These are only sent from smbd to notifyd if the SMB client issues + * FileChangeNotify requests. + */ + +/* + * The notifyd implementation is designed to cope with multiple daemons taking + * care of just a subset of smbds. The goal is to minimize the traffic between + * the notify daemons. The idea behind this is a samba/ctdb cluster, but it + * could also be used to spread the load of notifyd instances on a single + * node, should this become a bottleneck. The following diagram illustrates + * the setup. The numbers in the boxes are node:process ids. + * + * +-----------+ +-----------+ + * |notifyd 0:5|------------------|notifyd 1:6| + * +-----------+ +-----------+ + * / | \ / \ + * / | \ / \ + * +--------+ | +--------+ +--------+ +--------+ + * |smbd 0:1| | |smbd 0:4| |smbd 1:7| |smbd 1:2| + * +--------+ | +--------+ +--------+ +--------+ + * | + * +---------+ + * |smbd 0:20| + * +---------+ + * + * Suppose 0:1 and 0:4 are interested in changes for /foo and 0:20 creates the + * file /foo/bar, if everything fully connected, 0:20 would have to send two + * local messages, one to 0:1 and one to 0:4. With the notifyd design, 0:20 + * only has to send one message, it lets notifyd 0:5 do the hard work to + * multicast the change to 0:1 and 0:4. + * + * Now lets assume 1:7 on the other node creates /foo/baz. It tells its + * notifyd 1:6 about this change. All 1:6 will know about is that its peer + * notifyd 0:5 is interested in the change. Thus it forwards the event to 0:5, + * which sees it as if it came from just another local event creator. 0:5 will + * multicast the change to 0:1 and 0:4. To prevent notify loops, the message + * from 1:6 to 0:5 will carry a "proxied" flag, so that 0:5 will only forward + * the event to local clients. + */ + +/* + * Data that notifyd maintains per smbd notify instance + */ +struct notify_instance { + struct timespec creation_time; + uint32_t filter; + uint32_t subdir_filter; + void *private_data; +}; + +/* MSG_SMB_NOTIFY_REC_CHANGE payload */ +struct notify_rec_change_msg { + struct notify_instance instance; + char path[]; +}; + +/* + * The second message from smbd to notifyd is sent whenever an smbd makes a + * file system change. It tells notifyd to inform all interested parties about + * that change. This is the message that needs to be really fast in smbd + * because it is called a lot. + */ + +/* MSG_SMB_NOTIFY_TRIGGER payload */ +struct notify_trigger_msg { + struct timespec when; + uint32_t action; + uint32_t filter; + char path[]; +}; + +/* + * In response to a MSG_SMB_NOTIFY_TRIGGER message notifyd walks its database + * and sends out the following message to all interested clients + */ + +/* MSG_PVFS_NOTIFY payload */ +struct notify_event_msg { + struct timespec when; + void *private_data; + uint32_t action; + char path[]; +}; + +struct sys_notify_context; +struct ctdbd_connection; + +typedef int (*sys_notify_watch_fn)(TALLOC_CTX *mem_ctx, + struct sys_notify_context *ctx, + const char *path, + uint32_t *filter, + uint32_t *subdir_filter, + void (*callback)(struct sys_notify_context *ctx, + void *private_data, + struct notify_event *ev, + uint32_t filter), + void *private_data, + void *handle_p); + +struct tevent_req *notifyd_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct messaging_context *msg_ctx, + struct ctdbd_connection *ctdbd_conn, + sys_notify_watch_fn sys_notify_watch, + struct sys_notify_context *sys_notify_ctx); +int notifyd_recv(struct tevent_req *req); + +#endif diff --git a/source3/smbd/notifyd/notifyd_db.c b/source3/smbd/notifyd/notifyd_db.c new file mode 100644 index 0000000..1822861 --- /dev/null +++ b/source3/smbd/notifyd/notifyd_db.c @@ -0,0 +1,165 @@ +/* + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "replace.h" +#include "lib/util/debug.h" +#include "lib/util/server_id_db.h" +#include "notifyd_private.h" +#include "notifyd_db.h" + +struct notifyd_parse_db_state { + bool (*fn)(const char *path, + struct server_id server, + const struct notify_instance *instance, + void *private_data); + void *private_data; +}; + +static bool notifyd_parse_db_parser(TDB_DATA key, TDB_DATA value, + void *private_data) +{ + struct notifyd_parse_db_state *state = private_data; + char path[key.dsize+1]; + struct notifyd_instance *instances = NULL; + size_t num_instances = 0; + size_t i; + bool ok; + + memcpy(path, key.dptr, key.dsize); + path[key.dsize] = 0; + + ok = notifyd_parse_entry(value.dptr, value.dsize, &instances, + &num_instances); + if (!ok) { + DBG_DEBUG("Could not parse entry for path %s\n", path); + return true; + } + + for (i=0; i<num_instances; i++) { + ok = state->fn(path, instances[i].client, + &instances[i].instance, + state->private_data); + if (!ok) { + return false; + } + } + + return true; +} + +static NTSTATUS notifyd_parse_db( + const uint8_t *buf, + size_t buflen, + uint64_t *log_index, + bool (*fn)(const char *path, + struct server_id server, + const struct notify_instance *instance, + void *private_data), + void *private_data) +{ + struct notifyd_parse_db_state state = { + .fn = fn, .private_data = private_data + }; + NTSTATUS status; + + if (buflen < 8) { + return NT_STATUS_INTERNAL_DB_CORRUPTION; + } + *log_index = BVAL(buf, 0); + + buf += 8; + buflen -= 8; + + status = dbwrap_parse_marshall_buf( + buf, buflen, notifyd_parse_db_parser, &state); + return status; +} + +NTSTATUS notify_walk(struct messaging_context *msg_ctx, + bool (*fn)(const char *path, struct server_id server, + const struct notify_instance *instance, + void *private_data), + void *private_data) +{ + struct server_id_db *names_db = NULL; + struct server_id notifyd = { .pid = 0, }; + struct tevent_context *ev = NULL; + struct tevent_req *req = NULL; + struct messaging_rec *rec = NULL; + uint64_t log_idx; + NTSTATUS status; + int ret; + bool ok; + + names_db = messaging_names_db(msg_ctx); + ok = server_id_db_lookup_one(names_db, "notify-daemon", ¬ifyd); + if (!ok) { + DBG_WARNING("No notify daemon around\n"); + return NT_STATUS_SERVER_UNAVAILABLE; + } + + ev = samba_tevent_context_init(msg_ctx); + if (ev == NULL) { + return NT_STATUS_NO_MEMORY; + } + + req = messaging_read_send(ev, ev, msg_ctx, MSG_SMB_NOTIFY_DB); + if (req == NULL) { + TALLOC_FREE(ev); + return NT_STATUS_NO_MEMORY; + } + + ok = tevent_req_set_endtime(req, ev, timeval_current_ofs(10, 0)); + if (!ok) { + TALLOC_FREE(ev); + return NT_STATUS_NO_MEMORY; + } + + status = messaging_send_buf( + msg_ctx, notifyd, MSG_SMB_NOTIFY_GET_DB, NULL, 0); + if (!NT_STATUS_IS_OK(status)) { + DBG_DEBUG("messaging_send_buf failed: %s\n", + nt_errstr(status)); + TALLOC_FREE(ev); + return status; + } + + ok = tevent_req_poll(req, ev); + if (!ok) { + DBG_DEBUG("tevent_req_poll failed\n"); + TALLOC_FREE(ev); + return NT_STATUS_INTERNAL_ERROR; + } + + ret = messaging_read_recv(req, ev, &rec); + if (ret != 0) { + DBG_DEBUG("messaging_read_recv failed: %s\n", + strerror(ret)); + TALLOC_FREE(ev); + return map_nt_error_from_unix(ret); + } + + status = notifyd_parse_db( + rec->buf.data, rec->buf.length, &log_idx, fn, private_data); + if (!NT_STATUS_IS_OK(status)) { + DBG_DEBUG("notifyd_parse_db failed: %s\n", + nt_errstr(status)); + TALLOC_FREE(ev); + return status; + } + + TALLOC_FREE(ev); + return NT_STATUS_OK; +} diff --git a/source3/smbd/notifyd/notifyd_db.h b/source3/smbd/notifyd/notifyd_db.h new file mode 100644 index 0000000..bd9e226 --- /dev/null +++ b/source3/smbd/notifyd/notifyd_db.h @@ -0,0 +1,27 @@ +/* + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#ifndef __NOTIFYD_NOTIFYD_DB_H__ +#define __NOTIFYD_NOTIFYD_DB_H__ +#include "replace.h" +#include "notifyd.h" + +NTSTATUS notify_walk(struct messaging_context *msg_ctx, + bool (*fn)(const char *path, struct server_id server, + const struct notify_instance *instance, + void *private_data), + void *private_data); + +#endif diff --git a/source3/smbd/notifyd/notifyd_entry.c b/source3/smbd/notifyd/notifyd_entry.c new file mode 100644 index 0000000..539010d --- /dev/null +++ b/source3/smbd/notifyd/notifyd_entry.c @@ -0,0 +1,42 @@ +/* + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "replace.h" +#include "lib/util/debug.h" +#include "notifyd_private.h" + +/* + * Parse an entry in the notifyd_context->entries database + */ + +bool notifyd_parse_entry( + uint8_t *buf, + size_t buflen, + struct notifyd_instance **instances, + size_t *num_instances) +{ + if ((buflen % sizeof(struct notifyd_instance)) != 0) { + DBG_WARNING("invalid buffer size: %zu\n", buflen); + return false; + } + + if (instances != NULL) { + *instances = (struct notifyd_instance *)buf; + } + if (num_instances != NULL) { + *num_instances = buflen / sizeof(struct notifyd_instance); + } + return true; +} diff --git a/source3/smbd/notifyd/notifyd_private.h b/source3/smbd/notifyd/notifyd_private.h new file mode 100644 index 0000000..36c08f4 --- /dev/null +++ b/source3/smbd/notifyd/notifyd_private.h @@ -0,0 +1,49 @@ +/* + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#ifndef __NOTIFYD_PRIVATE_H__ +#define __NOTIFYD_PRIVATE_H__ + +#include "replace.h" +#include "lib/util/server_id.h" +#include "notifyd.h" + +/* + * notifyd's representation of a notify instance + */ +struct notifyd_instance { + struct server_id client; + struct notify_instance instance; + + void *sys_watch; /* inotify/fam/etc handle */ + + /* + * Filters after sys_watch took responsibility of some bits + */ + uint32_t internal_filter; + uint32_t internal_subdir_filter; +}; + +/* + * Parse an entry in the notifyd_context->entries database + */ + +bool notifyd_parse_entry( + uint8_t *buf, + size_t buflen, + struct notifyd_instance **instances, + size_t *num_instances); + +#endif diff --git a/source3/smbd/notifyd/notifydd.c b/source3/smbd/notifyd/notifydd.c new file mode 100644 index 0000000..26bfcd8 --- /dev/null +++ b/source3/smbd/notifyd/notifydd.c @@ -0,0 +1,89 @@ +/* + * Unix SMB/CIFS implementation. + * + * Copyright (C) Volker Lendecke 2014 + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "replace.h" +#include "notifyd.h" +#include "lib/messages_ctdb.h" +#include <tevent.h> +#include "lib/util/tevent_unix.h" + +int main(int argc, const char *argv[]) +{ + TALLOC_CTX *frame; + struct tevent_context *ev; + struct messaging_context *msg; + struct tevent_req *req; + int err, ret; + bool ok; + + talloc_enable_leak_report_full(); + + frame = talloc_stackframe(); + + setup_logging("notifyd", DEBUG_DEFAULT_STDOUT); + lp_set_cmdline("log level", "10"); + + ok = lp_load_initial_only(get_dyn_CONFIGFILE()); + if (!ok) { + fprintf(stderr, "Can't load %s - run testparm to debug it\n", + get_dyn_CONFIGFILE()); + return 1; + } + + ev = samba_tevent_context_init(frame); + if (ev == NULL) { + fprintf(stderr, "samba_tevent_context_init failed\n"); + return 1; + } + + msg = messaging_init(ev, ev); + if (msg == NULL) { + fprintf(stderr, "messaging_init failed\n"); + return 1; + } + + if (!lp_load_global(get_dyn_CONFIGFILE())) { + fprintf(stderr, "Can't load %s - run testparm to debug it\n", + get_dyn_CONFIGFILE()); + return 1; + } + + req = notifyd_send(ev, ev, msg, messaging_ctdb_connection(), + NULL, NULL); + if (req == NULL) { + fprintf(stderr, "notifyd_send failed\n"); + return 1; + } + + ok = tevent_req_poll_unix(req, ev, &err); + if (!ok) { + fprintf(stderr, "tevent_req_poll_unix failed: %s\n", + strerror(err)); + return 1; + } + + ret = notifyd_recv(req); + + printf("notifyd_recv returned %d (%s)\n", ret, + ret ? strerror(ret) : "ok"); + + TALLOC_FREE(frame); + + return 0; +} diff --git a/source3/smbd/notifyd/test_notifyd.c b/source3/smbd/notifyd/test_notifyd.c new file mode 100644 index 0000000..431da9b --- /dev/null +++ b/source3/smbd/notifyd/test_notifyd.c @@ -0,0 +1,347 @@ +/* + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "replace.h" +#include "fcn_wait.h" +#include "notifyd.h" +#include "notifyd_db.h" +#include "messages.h" +#include "lib/util/server_id.h" +#include "lib/util/server_id_db.h" +#include "lib/util/tevent_ntstatus.h" +#include "lib/torture/torture.h" +#include "torture/local/proto.h" +#include "lib/param/loadparm.h" +#include "source3/param/loadparm.h" +#include "source4/torture/smbtorture.h" + +struct fcn_test_state { + struct tevent_req *fcn_req; + bool got_trigger; +}; + +static void fcn_test_done(struct tevent_req *subreq); + +static struct tevent_req *fcn_test_send( + TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct messaging_context *msg_ctx, + struct server_id notifyd, + const char *fcn_path, + uint32_t fcn_filter, + uint32_t fcn_subdir_filter, + const char *trigger_path, + uint32_t trigger_action, + uint32_t trigger_filter) +{ + struct tevent_req *req = NULL; + struct fcn_test_state *state = NULL; + struct notify_trigger_msg msg; + struct iovec iov[2]; + NTSTATUS status; + + req = tevent_req_create(mem_ctx, &state, struct fcn_test_state); + if (req == NULL) { + return NULL; + } + + state->fcn_req = fcn_wait_send( + state, + ev, + msg_ctx, + notifyd, + fcn_path, + fcn_filter, + fcn_subdir_filter); + if (tevent_req_nomem(state->fcn_req, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(state->fcn_req, fcn_test_done, req); + + msg = (struct notify_trigger_msg) { + .when = timespec_current(), + .action = trigger_action, + .filter = trigger_filter, + }; + iov[0] = (struct iovec) { + .iov_base = &msg, + .iov_len = offsetof(struct notify_trigger_msg, path), + }; + iov[1] = (struct iovec) { + .iov_base = discard_const_p(char, trigger_path), + .iov_len = strlen(trigger_path)+1, + }; + + status = messaging_send_iov( + msg_ctx, + notifyd, + MSG_SMB_NOTIFY_TRIGGER, + iov, + ARRAY_SIZE(iov), + NULL, + 0); + if (tevent_req_nterror(req, status)) { + return tevent_req_post(req, ev); + } + + return req; +} + +static void fcn_test_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct fcn_test_state *state = tevent_req_data( + req, struct fcn_test_state); + NTSTATUS status; + bool ok; + + SMB_ASSERT(subreq == state->fcn_req); + + status = fcn_wait_recv(subreq, NULL, NULL, NULL, NULL); + + if (NT_STATUS_EQUAL(status, NT_STATUS_CANCELLED)) { + TALLOC_FREE(subreq); + state->fcn_req = NULL; + tevent_req_done(req); + return; + } + + if (tevent_req_nterror(req, status)) { + TALLOC_FREE(subreq); + state->fcn_req = NULL; + return; + } + + state->got_trigger = true; + + ok = tevent_req_cancel(subreq); + if (!ok) { + tevent_req_nterror(req, NT_STATUS_INTERNAL_ERROR); + return; + } +} + +static NTSTATUS fcn_test_recv(struct tevent_req *req, bool *got_trigger) +{ + struct fcn_test_state *state = tevent_req_data( + req, struct fcn_test_state); + NTSTATUS status; + + if (tevent_req_is_nterror(req, &status)) { + return status; + } + if (got_trigger != NULL) { + *got_trigger = state->got_trigger; + } + + return NT_STATUS_OK; +} + +static NTSTATUS fcn_test( + struct messaging_context *msg_ctx, + struct server_id notifyd, + const char *fcn_path, + uint32_t fcn_filter, + uint32_t fcn_subdir_filter, + const char *trigger_path, + uint32_t trigger_action, + uint32_t trigger_filter, + bool *got_trigger) +{ + struct tevent_context *ev = NULL; + struct tevent_req *req = NULL; + NTSTATUS status = NT_STATUS_NO_MEMORY; + + ev = samba_tevent_context_init(msg_ctx); + if (ev == NULL) { + goto fail; + } + req = fcn_test_send( + ev, + ev, + msg_ctx, + notifyd, + fcn_path, + fcn_filter, + fcn_subdir_filter, + trigger_path, + trigger_action, + trigger_filter); + if (req == NULL) { + goto fail; + } + if (!tevent_req_poll_ntstatus(req, ev, &status)) { + goto fail; + } + status = fcn_test_recv(req, got_trigger); +fail: + TALLOC_FREE(ev); + return status; +} + +static bool test_notifyd_trigger1(struct torture_context *tctx) +{ + struct messaging_context *msg_ctx = NULL; + struct server_id_db *names = NULL; + struct server_id notifyd; + NTSTATUS status; + bool got_trigger = false; + bool ok; + + /* + * Basic filechangenotify test: Wait for /home, trigger on + * /home/foo, check an event was received + */ + + lp_load_global(tctx->lp_ctx->szConfigFile); + + msg_ctx = messaging_init(tctx, tctx->ev); + torture_assert_not_null(tctx, msg_ctx, "messaging_init"); + + names = messaging_names_db(msg_ctx); + ok = server_id_db_lookup_one(names, "notify-daemon", ¬ifyd); + torture_assert(tctx, ok, "server_id_db_lookup_one"); + + status = fcn_test( + msg_ctx, + notifyd, + "/home", + UINT32_MAX, + UINT32_MAX, + "/home/foo", + UINT32_MAX, + UINT32_MAX, + &got_trigger); + torture_assert_ntstatus_ok(tctx, status, "fcn_test"); + torture_assert(tctx, got_trigger, "got_trigger"); + + return true; +} + +struct notifyd_have_state { + struct server_id self; + bool found; +}; + +static bool notifyd_have_fn( + const char *path, + struct server_id server, + const struct notify_instance *instance, + void *private_data) +{ + struct notifyd_have_state *state = private_data; + state->found |= server_id_equal(&server, &state->self); + return true; +} + +static bool notifyd_have_self(struct messaging_context *msg_ctx) +{ + struct notifyd_have_state state = { + .self = messaging_server_id(msg_ctx), + }; + NTSTATUS status; + + status = notify_walk(msg_ctx, notifyd_have_fn, &state); + if (!NT_STATUS_IS_OK(status)) { + return false; + } + return state.found; +} + +static bool test_notifyd_dbtest1(struct torture_context *tctx) +{ + struct tevent_context *ev = tctx->ev; + struct messaging_context *msg_ctx = NULL; + struct tevent_req *req = NULL; + struct server_id_db *names = NULL; + struct server_id notifyd; + NTSTATUS status; + bool ok; + + /* + * Make sure fcn_wait_send adds us to the notifyd internal + * database and that cancelling the fcn request removes us + * again. + */ + + lp_load_global(tctx->lp_ctx->szConfigFile); + + msg_ctx = messaging_init(tctx, ev); + torture_assert_not_null(tctx, msg_ctx, "messaging_init"); + + names = messaging_names_db(msg_ctx); + ok = server_id_db_lookup_one(names, "notify-daemon", ¬ifyd); + torture_assert(tctx, ok, "server_id_db_lookup_one"); + + req = fcn_wait_send( + msg_ctx, ev, msg_ctx, notifyd, "/x", UINT32_MAX, UINT32_MAX); + torture_assert_not_null(tctx, req, "fcn_wait_send"); + + ok = notifyd_have_self(msg_ctx); + torture_assert(tctx, ok, "notifyd_have_self"); + + ok = tevent_req_cancel(req); + torture_assert(tctx, ok, "tevent_req_cancel"); + + ok = tevent_req_poll(req, ev); + torture_assert(tctx, ok, "tevent_req_poll"); + + status = fcn_wait_recv(req, NULL, NULL, NULL, NULL); + torture_assert_ntstatus_equal( + tctx, status, NT_STATUS_CANCELLED, "fcn_wait_recv"); + TALLOC_FREE(req); + + ok = notifyd_have_self(msg_ctx); + torture_assert(tctx, !ok, "tevent_req_poll"); + TALLOC_FREE(msg_ctx); + + return true; +} + +NTSTATUS torture_notifyd_init(TALLOC_CTX *mem_ctx); +NTSTATUS torture_notifyd_init(TALLOC_CTX *mem_ctx) +{ + struct torture_suite *suite = NULL; + struct torture_tcase *tcase = NULL; + bool ok; + + suite = torture_suite_create(mem_ctx, "notifyd"); + if (suite == NULL) { + goto fail; + } + + tcase = torture_suite_add_simple_test( + suite, "trigger1", test_notifyd_trigger1); + if (tcase == NULL) { + goto fail; + } + + tcase = torture_suite_add_simple_test( + suite, "dbtest1", test_notifyd_dbtest1); + if (tcase == NULL) { + goto fail; + } + suite->description = "notifyd unit tests"; + + ok = torture_register_suite(mem_ctx, suite); + if (!ok) { + goto fail; + } + return NT_STATUS_OK; +fail: + TALLOC_FREE(suite); + return NT_STATUS_NO_MEMORY; +} diff --git a/source3/smbd/notifyd/tests.c b/source3/smbd/notifyd/tests.c new file mode 100644 index 0000000..6bcce6a --- /dev/null +++ b/source3/smbd/notifyd/tests.c @@ -0,0 +1,118 @@ +/* + * Unix SMB/CIFS implementation. + * + * Copyright (C) Volker Lendecke 2014 + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "replace.h" +#include "notifyd.h" +#include "messages.h" +#include "lib/util/server_id_db.h" + +int main(int argc, const char *argv[]) +{ + TALLOC_CTX *frame = talloc_stackframe(); + struct tevent_context *ev; + struct messaging_context *msg_ctx; + struct server_id_db *names; + struct server_id notifyd; + struct tevent_req *req; + unsigned i; + bool ok; + + if (argc != 2) { + fprintf(stderr, "Usage: %s <smb.conf-file>\n", argv[0]); + exit(1); + } + + setup_logging(argv[0], DEBUG_STDOUT); + lp_load_global(argv[1]); + + ev = tevent_context_init(NULL); + if (ev == NULL) { + fprintf(stderr, "tevent_context_init failed\n"); + exit(1); + } + + msg_ctx = messaging_init(ev, ev); + if (msg_ctx == NULL) { + fprintf(stderr, "messaging_init failed\n"); + exit(1); + } + + names = messaging_names_db(msg_ctx); + + ok = server_id_db_lookup_one(names, "notify-daemon", ¬ifyd); + if (!ok) { + fprintf(stderr, "no notifyd\n"); + exit(1); + } + + for (i=0; i<50000; i++) { + struct notify_rec_change_msg msg = { + .instance.filter = UINT32_MAX, + .instance.subdir_filter = UINT32_MAX + }; + char path[64]; + size_t len; + struct iovec iov[2]; + NTSTATUS status; + + len = snprintf(path, sizeof(path), "/tmp%u", i); + + iov[0].iov_base = &msg; + iov[0].iov_len = offsetof(struct notify_rec_change_msg, path); + iov[1].iov_base = path; + iov[1].iov_len = len+1; + + status = messaging_send_iov( + msg_ctx, notifyd, MSG_SMB_NOTIFY_REC_CHANGE, + iov, ARRAY_SIZE(iov), NULL, 0); + if (!NT_STATUS_IS_OK(status)) { + fprintf(stderr, "messaging_send_iov returned %s\n", + nt_errstr(status)); + exit(1); + } + + msg.instance.filter = 0; + msg.instance.subdir_filter = 0; + + status = messaging_send_iov( + msg_ctx, notifyd, MSG_SMB_NOTIFY_REC_CHANGE, + iov, ARRAY_SIZE(iov), NULL, 0); + if (!NT_STATUS_IS_OK(status)) { + fprintf(stderr, "messaging_send_iov returned %s\n", + nt_errstr(status)); + exit(1); + } + } + + req = messaging_read_send(ev, ev, msg_ctx, MSG_PONG); + if (req == NULL) { + fprintf(stderr, "messaging_read_send failed\n"); + exit(1); + } + messaging_send_buf(msg_ctx, notifyd, MSG_PING, NULL, 0); + + ok = tevent_req_poll(req, ev); + if (!ok) { + fprintf(stderr, "tevent_req_poll failed\n"); + exit(1); + } + + TALLOC_FREE(frame); + return 0; +} diff --git a/source3/smbd/notifyd/wscript_build b/source3/smbd/notifyd/wscript_build new file mode 100644 index 0000000..6880a31 --- /dev/null +++ b/source3/smbd/notifyd/wscript_build @@ -0,0 +1,44 @@ +#!/usr/bin/env python + +bld.SAMBA3_SUBSYSTEM('fcn_wait', + source='fcn_wait.c', + deps='samba3core') + +bld.SAMBA3_SUBSYSTEM('notifyd_db', + source='notifyd_entry.c notifyd_db.c', + deps='samba-debug dbwrap errors3') + +bld.SAMBA3_SUBSYSTEM('notifyd', + source='notifyd.c', + deps=''' + util_tdb + TDB_LIB + messages_util + notifyd_db + ''') + +bld.SAMBA3_BINARY('notifyd-tests', + source='tests.c', + install=False, + deps=''' + smbconf + ''') + +bld.SAMBA3_BINARY('notifydd', + source='notifydd.c', + install=False, + deps='''notifyd + smbconf + ''') + +TORTURE_NOTIFYD_SOURCE='test_notifyd.c' +TORTURE_NOTIFYD_DEPS='fcn_wait notifyd_db' + +bld.SAMBA_MODULE('TORTURE_NOTIFYD', + source=TORTURE_NOTIFYD_SOURCE, + subsystem='smbtorture', + init_function='torture_notifyd_init', + deps=TORTURE_NOTIFYD_DEPS, + internal_module=True, + enabled=bld.PYTHON_BUILD_IS_ENABLED() + ) |