diff options
Diffstat (limited to 'source3/smbd/notifyd/notifyd.c')
-rw-r--r-- | source3/smbd/notifyd/notifyd.c | 1428 |
1 files changed, 1428 insertions, 0 deletions
diff --git a/source3/smbd/notifyd/notifyd.c b/source3/smbd/notifyd/notifyd.c new file mode 100644 index 0000000..475c8e9 --- /dev/null +++ b/source3/smbd/notifyd/notifyd.c @@ -0,0 +1,1428 @@ +/* + * Unix SMB/CIFS implementation. + * + * Copyright (C) Volker Lendecke 2014 + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "replace.h" +#include <tevent.h> +#include "notifyd_private.h" +#include "lib/util/server_id.h" +#include "lib/util/data_blob.h" +#include "librpc/gen_ndr/notify.h" +#include "librpc/gen_ndr/messaging.h" +#include "librpc/gen_ndr/server_id.h" +#include "lib/dbwrap/dbwrap.h" +#include "lib/dbwrap/dbwrap_rbt.h" +#include "messages.h" +#include "tdb.h" +#include "util_tdb.h" +#include "notifyd.h" +#include "lib/util/server_id_db.h" +#include "lib/util/tevent_unix.h" +#include "lib/util/tevent_ntstatus.h" +#include "ctdbd_conn.h" +#include "ctdb_srvids.h" +#include "server_id_db_util.h" +#include "lib/util/iov_buf.h" +#include "messages_util.h" + +#ifdef CLUSTER_SUPPORT +#include "ctdb_protocol.h" +#endif + +struct notifyd_peer; + +/* + * All of notifyd's state + */ + +struct notifyd_state { + struct tevent_context *ev; + struct messaging_context *msg_ctx; + struct ctdbd_connection *ctdbd_conn; + + /* + * Database of everything clients show interest in. Indexed by + * absolute path. The database keys are not 0-terminated + * to allow the criticial operation, notifyd_trigger, to walk + * the structure from the top without adding intermediate 0s. + * The database records contain an array of + * + * struct notifyd_instance + * + * to be maintained and parsed by notifyd_parse_entry() + */ + struct db_context *entries; + + /* + * In the cluster case, this is the place where we store a log + * of all MSG_SMB_NOTIFY_REC_CHANGE messages. We just 1:1 + * forward them to our peer notifyd's in the cluster once a + * second or when the log grows too large. + */ + + struct messaging_reclog *log; + + /* + * Array of companion notifyd's in a cluster. Every notifyd + * broadcasts its messaging_reclog to every other notifyd in + * the cluster. This is done by making ctdb send a message to + * srvid CTDB_SRVID_SAMBA_NOTIFY_PROXY with destination node + * number CTDB_BROADCAST_CONNECTED. Everybody in the cluster who + * had called register_with_ctdbd this srvid will receive the + * broadcasts. + * + * Database replication happens via these broadcasts. Also, + * they serve as liveness indication. If a notifyd receives a + * broadcast from an unknown peer, it will create one for this + * srvid. Also when we don't hear anything from a peer for a + * while, we will discard it. + */ + + struct notifyd_peer **peers; + size_t num_peers; + + sys_notify_watch_fn sys_notify_watch; + struct sys_notify_context *sys_notify_ctx; +}; + +struct notifyd_peer { + struct notifyd_state *state; + struct server_id pid; + uint64_t rec_index; + struct db_context *db; + time_t last_broadcast; +}; + +static void notifyd_rec_change(struct messaging_context *msg_ctx, + void *private_data, uint32_t msg_type, + struct server_id src, DATA_BLOB *data); +static void notifyd_trigger(struct messaging_context *msg_ctx, + void *private_data, uint32_t msg_type, + struct server_id src, DATA_BLOB *data); +static void notifyd_get_db(struct messaging_context *msg_ctx, + void *private_data, uint32_t msg_type, + struct server_id src, DATA_BLOB *data); + +#ifdef CLUSTER_SUPPORT +static void notifyd_got_db(struct messaging_context *msg_ctx, + void *private_data, uint32_t msg_type, + struct server_id src, DATA_BLOB *data); +static void notifyd_broadcast_reclog(struct ctdbd_connection *ctdbd_conn, + struct server_id src, + struct messaging_reclog *log); +#endif +static void notifyd_sys_callback(struct sys_notify_context *ctx, + void *private_data, struct notify_event *ev, + uint32_t filter); + +#ifdef CLUSTER_SUPPORT +static struct tevent_req *notifyd_broadcast_reclog_send( + TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdbd_connection *ctdbd_conn, struct server_id src, + struct messaging_reclog *log); +static int notifyd_broadcast_reclog_recv(struct tevent_req *req); + +static struct tevent_req *notifyd_clean_peers_send( + TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct notifyd_state *notifyd); +static int notifyd_clean_peers_recv(struct tevent_req *req); +#endif + +static int sys_notify_watch_dummy( + TALLOC_CTX *mem_ctx, + struct sys_notify_context *ctx, + const char *path, + uint32_t *filter, + uint32_t *subdir_filter, + void (*callback)(struct sys_notify_context *ctx, + void *private_data, + struct notify_event *ev, + uint32_t filter), + void *private_data, + void *handle_p) +{ + void **handle = handle_p; + *handle = NULL; + return 0; +} + +#ifdef CLUSTER_SUPPORT +static void notifyd_broadcast_reclog_finished(struct tevent_req *subreq); +static void notifyd_clean_peers_finished(struct tevent_req *subreq); +static int notifyd_snoop_broadcast(struct tevent_context *ev, + uint32_t src_vnn, uint32_t dst_vnn, + uint64_t dst_srvid, + const uint8_t *msg, size_t msglen, + void *private_data); +#endif + +struct tevent_req *notifyd_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct messaging_context *msg_ctx, + struct ctdbd_connection *ctdbd_conn, + sys_notify_watch_fn sys_notify_watch, + struct sys_notify_context *sys_notify_ctx) +{ + struct tevent_req *req; +#ifdef CLUSTER_SUPPORT + struct tevent_req *subreq; +#endif + struct notifyd_state *state; + struct server_id_db *names_db; + NTSTATUS status; + int ret; + + req = tevent_req_create(mem_ctx, &state, struct notifyd_state); + if (req == NULL) { + return NULL; + } + state->ev = ev; + state->msg_ctx = msg_ctx; + state->ctdbd_conn = ctdbd_conn; + + if (sys_notify_watch == NULL) { + sys_notify_watch = sys_notify_watch_dummy; + } + + state->sys_notify_watch = sys_notify_watch; + state->sys_notify_ctx = sys_notify_ctx; + + state->entries = db_open_rbt(state); + if (tevent_req_nomem(state->entries, req)) { + return tevent_req_post(req, ev); + } + + status = messaging_register(msg_ctx, state, MSG_SMB_NOTIFY_REC_CHANGE, + notifyd_rec_change); + if (tevent_req_nterror(req, status)) { + return tevent_req_post(req, ev); + } + + status = messaging_register(msg_ctx, state, MSG_SMB_NOTIFY_TRIGGER, + notifyd_trigger); + if (tevent_req_nterror(req, status)) { + goto deregister_rec_change; + } + + status = messaging_register(msg_ctx, state, MSG_SMB_NOTIFY_GET_DB, + notifyd_get_db); + if (tevent_req_nterror(req, status)) { + goto deregister_trigger; + } + + names_db = messaging_names_db(msg_ctx); + + ret = server_id_db_set_exclusive(names_db, "notify-daemon"); + if (ret != 0) { + DBG_DEBUG("server_id_db_add failed: %s\n", + strerror(ret)); + tevent_req_error(req, ret); + goto deregister_get_db; + } + + if (ctdbd_conn == NULL) { + /* + * No cluster around, skip the database replication + * engine + */ + return req; + } + +#ifdef CLUSTER_SUPPORT + status = messaging_register(msg_ctx, state, MSG_SMB_NOTIFY_DB, + notifyd_got_db); + if (tevent_req_nterror(req, status)) { + goto deregister_get_db; + } + + state->log = talloc_zero(state, struct messaging_reclog); + if (tevent_req_nomem(state->log, req)) { + goto deregister_db; + } + + subreq = notifyd_broadcast_reclog_send( + state->log, ev, ctdbd_conn, + messaging_server_id(msg_ctx), + state->log); + if (tevent_req_nomem(subreq, req)) { + goto deregister_db; + } + tevent_req_set_callback(subreq, + notifyd_broadcast_reclog_finished, + req); + + subreq = notifyd_clean_peers_send(state, ev, state); + if (tevent_req_nomem(subreq, req)) { + goto deregister_db; + } + tevent_req_set_callback(subreq, notifyd_clean_peers_finished, + req); + + ret = register_with_ctdbd(ctdbd_conn, + CTDB_SRVID_SAMBA_NOTIFY_PROXY, + notifyd_snoop_broadcast, state); + if (ret != 0) { + tevent_req_error(req, ret); + goto deregister_db; + } +#endif + + return req; + +#ifdef CLUSTER_SUPPORT +deregister_db: + messaging_deregister(msg_ctx, MSG_SMB_NOTIFY_DB, state); +#endif +deregister_get_db: + messaging_deregister(msg_ctx, MSG_SMB_NOTIFY_GET_DB, state); +deregister_trigger: + messaging_deregister(msg_ctx, MSG_SMB_NOTIFY_TRIGGER, state); +deregister_rec_change: + messaging_deregister(msg_ctx, MSG_SMB_NOTIFY_REC_CHANGE, state); + return tevent_req_post(req, ev); +} + +#ifdef CLUSTER_SUPPORT + +static void notifyd_broadcast_reclog_finished(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + int ret; + + ret = notifyd_broadcast_reclog_recv(subreq); + TALLOC_FREE(subreq); + tevent_req_error(req, ret); +} + +static void notifyd_clean_peers_finished(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + int ret; + + ret = notifyd_clean_peers_recv(subreq); + TALLOC_FREE(subreq); + tevent_req_error(req, ret); +} + +#endif + +int notifyd_recv(struct tevent_req *req) +{ + return tevent_req_simple_recv_unix(req); +} + +static bool notifyd_apply_rec_change( + const struct server_id *client, + const char *path, size_t pathlen, + const struct notify_instance *chg, + struct db_context *entries, + sys_notify_watch_fn sys_notify_watch, + struct sys_notify_context *sys_notify_ctx, + struct messaging_context *msg_ctx) +{ + struct db_record *rec = NULL; + struct notifyd_instance *instances = NULL; + size_t num_instances; + size_t i; + struct notifyd_instance *instance = NULL; + TDB_DATA value; + NTSTATUS status; + bool ok = false; + + if (pathlen == 0) { + DBG_WARNING("pathlen==0\n"); + return false; + } + if (path[pathlen-1] != '\0') { + DBG_WARNING("path not 0-terminated\n"); + return false; + } + + DBG_DEBUG("path=%s, filter=%"PRIu32", subdir_filter=%"PRIu32", " + "private_data=%p\n", + path, + chg->filter, + chg->subdir_filter, + chg->private_data); + + rec = dbwrap_fetch_locked( + entries, entries, + make_tdb_data((const uint8_t *)path, pathlen-1)); + + if (rec == NULL) { + DBG_WARNING("dbwrap_fetch_locked failed\n"); + goto fail; + } + + num_instances = 0; + value = dbwrap_record_get_value(rec); + + if (value.dsize != 0) { + if (!notifyd_parse_entry(value.dptr, value.dsize, NULL, + &num_instances)) { + goto fail; + } + } + + /* + * Overallocate by one instance to avoid a realloc when adding + */ + instances = talloc_array(rec, struct notifyd_instance, + num_instances + 1); + if (instances == NULL) { + DBG_WARNING("talloc failed\n"); + goto fail; + } + + if (value.dsize != 0) { + memcpy(instances, value.dptr, value.dsize); + } + + for (i=0; i<num_instances; i++) { + instance = &instances[i]; + + if (server_id_equal(&instance->client, client) && + (instance->instance.private_data == chg->private_data)) { + break; + } + } + + if (i < num_instances) { + instance->instance = *chg; + } else { + /* + * We've overallocated for one instance + */ + instance = &instances[num_instances]; + + *instance = (struct notifyd_instance) { + .client = *client, + .instance = *chg, + .internal_filter = chg->filter, + .internal_subdir_filter = chg->subdir_filter + }; + + num_instances += 1; + } + + if ((instance->instance.filter != 0) || + (instance->instance.subdir_filter != 0)) { + int ret; + + TALLOC_FREE(instance->sys_watch); + + ret = sys_notify_watch(entries, sys_notify_ctx, path, + &instance->internal_filter, + &instance->internal_subdir_filter, + notifyd_sys_callback, msg_ctx, + &instance->sys_watch); + if (ret != 0) { + DBG_WARNING("sys_notify_watch for [%s] returned %s\n", + path, strerror(errno)); + } + } + + if ((instance->instance.filter == 0) && + (instance->instance.subdir_filter == 0)) { + /* This is a delete request */ + TALLOC_FREE(instance->sys_watch); + *instance = instances[num_instances-1]; + num_instances -= 1; + } + + DBG_DEBUG("%s has %zu instances\n", path, num_instances); + + if (num_instances == 0) { + status = dbwrap_record_delete(rec); + if (!NT_STATUS_IS_OK(status)) { + DBG_WARNING("dbwrap_record_delete returned %s\n", + nt_errstr(status)); + goto fail; + } + } else { + value = make_tdb_data( + (uint8_t *)instances, + sizeof(struct notifyd_instance) * num_instances); + + status = dbwrap_record_store(rec, value, 0); + if (!NT_STATUS_IS_OK(status)) { + DBG_WARNING("dbwrap_record_store returned %s\n", + nt_errstr(status)); + goto fail; + } + } + + ok = true; +fail: + TALLOC_FREE(rec); + return ok; +} + +static void notifyd_sys_callback(struct sys_notify_context *ctx, + void *private_data, struct notify_event *ev, + uint32_t filter) +{ + struct messaging_context *msg_ctx = talloc_get_type_abort( + private_data, struct messaging_context); + struct notify_trigger_msg msg; + struct iovec iov[4]; + char slash = '/'; + + msg = (struct notify_trigger_msg) { + .when = timespec_current(), + .action = ev->action, + .filter = filter, + }; + + iov[0].iov_base = &msg; + iov[0].iov_len = offsetof(struct notify_trigger_msg, path); + iov[1].iov_base = discard_const_p(char, ev->dir); + iov[1].iov_len = strlen(ev->dir); + iov[2].iov_base = &slash; + iov[2].iov_len = 1; + iov[3].iov_base = discard_const_p(char, ev->path); + iov[3].iov_len = strlen(ev->path)+1; + + messaging_send_iov( + msg_ctx, messaging_server_id(msg_ctx), + MSG_SMB_NOTIFY_TRIGGER, iov, ARRAY_SIZE(iov), NULL, 0); +} + +static bool notifyd_parse_rec_change(uint8_t *buf, size_t bufsize, + struct notify_rec_change_msg **pmsg, + size_t *pathlen) +{ + struct notify_rec_change_msg *msg; + + if (bufsize < offsetof(struct notify_rec_change_msg, path) + 1) { + DBG_WARNING("message too short, ignoring: %zu\n", bufsize); + return false; + } + + *pmsg = msg = (struct notify_rec_change_msg *)buf; + *pathlen = bufsize - offsetof(struct notify_rec_change_msg, path); + + DBG_DEBUG("Got rec_change_msg filter=%"PRIu32", " + "subdir_filter=%"PRIu32", private_data=%p, path=%.*s\n", + msg->instance.filter, + msg->instance.subdir_filter, + msg->instance.private_data, + (int)(*pathlen), + msg->path); + + return true; +} + +static void notifyd_rec_change(struct messaging_context *msg_ctx, + void *private_data, uint32_t msg_type, + struct server_id src, DATA_BLOB *data) +{ + struct notifyd_state *state = talloc_get_type_abort( + private_data, struct notifyd_state); + struct server_id_buf idbuf; + struct notify_rec_change_msg *msg; + size_t pathlen; + bool ok; + struct notify_instance instance; + + DBG_DEBUG("Got %zu bytes from %s\n", data->length, + server_id_str_buf(src, &idbuf)); + + ok = notifyd_parse_rec_change(data->data, data->length, + &msg, &pathlen); + if (!ok) { + return; + } + + memcpy(&instance, &msg->instance, sizeof(instance)); /* avoid SIGBUS */ + + ok = notifyd_apply_rec_change( + &src, msg->path, pathlen, &instance, + state->entries, state->sys_notify_watch, state->sys_notify_ctx, + state->msg_ctx); + if (!ok) { + DBG_DEBUG("notifyd_apply_rec_change failed, ignoring\n"); + return; + } + + if ((state->log == NULL) || (state->ctdbd_conn == NULL)) { + return; + } + +#ifdef CLUSTER_SUPPORT + { + + struct messaging_rec **tmp; + struct messaging_reclog *log; + struct iovec iov = { .iov_base = data->data, .iov_len = data->length }; + + log = state->log; + + tmp = talloc_realloc(log, log->recs, struct messaging_rec *, + log->num_recs+1); + if (tmp == NULL) { + DBG_WARNING("talloc_realloc failed, ignoring\n"); + return; + } + log->recs = tmp; + + log->recs[log->num_recs] = messaging_rec_create( + log->recs, src, messaging_server_id(msg_ctx), + msg_type, &iov, 1, NULL, 0); + + if (log->recs[log->num_recs] == NULL) { + DBG_WARNING("messaging_rec_create failed, ignoring\n"); + return; + } + + log->num_recs += 1; + + if (log->num_recs >= 100) { + /* + * Don't let the log grow too large + */ + notifyd_broadcast_reclog(state->ctdbd_conn, + messaging_server_id(msg_ctx), log); + } + + } +#endif +} + +struct notifyd_trigger_state { + struct messaging_context *msg_ctx; + struct notify_trigger_msg *msg; + bool recursive; + bool covered_by_sys_notify; +}; + +static void notifyd_trigger_parser(TDB_DATA key, TDB_DATA data, + void *private_data); + +static void notifyd_trigger(struct messaging_context *msg_ctx, + void *private_data, uint32_t msg_type, + struct server_id src, DATA_BLOB *data) +{ + struct notifyd_state *state = talloc_get_type_abort( + private_data, struct notifyd_state); + struct server_id my_id = messaging_server_id(msg_ctx); + struct notifyd_trigger_state tstate; + const char *path; + const char *p, *next_p; + + if (data->length < offsetof(struct notify_trigger_msg, path) + 1) { + DBG_WARNING("message too short, ignoring: %zu\n", + data->length); + return; + } + if (data->data[data->length-1] != 0) { + DBG_WARNING("path not 0-terminated, ignoring\n");; + return; + } + + tstate.msg_ctx = msg_ctx; + + tstate.covered_by_sys_notify = (src.vnn == my_id.vnn); + tstate.covered_by_sys_notify &= !server_id_equal(&src, &my_id); + + tstate.msg = (struct notify_trigger_msg *)data->data; + path = tstate.msg->path; + + DBG_DEBUG("Got trigger_msg action=%"PRIu32", filter=%"PRIu32", " + "path=%s\n", + tstate.msg->action, + tstate.msg->filter, + path); + + if (path[0] != '/') { + DBG_WARNING("path %s does not start with /, ignoring\n", + path); + return; + } + + for (p = strchr(path+1, '/'); p != NULL; p = next_p) { + ptrdiff_t path_len = p - path; + TDB_DATA key; + uint32_t i; + + next_p = strchr(p+1, '/'); + tstate.recursive = (next_p != NULL); + + DBG_DEBUG("Trying path %.*s\n", (int)path_len, path); + + key = (TDB_DATA) { .dptr = discard_const_p(uint8_t, path), + .dsize = path_len }; + + dbwrap_parse_record(state->entries, key, + notifyd_trigger_parser, &tstate); + + if (state->peers == NULL) { + continue; + } + + if (src.vnn != my_id.vnn) { + continue; + } + + for (i=0; i<state->num_peers; i++) { + if (state->peers[i]->db == NULL) { + /* + * Inactive peer, did not get a db yet + */ + continue; + } + dbwrap_parse_record(state->peers[i]->db, key, + notifyd_trigger_parser, &tstate); + } + } +} + +static void notifyd_send_delete(struct messaging_context *msg_ctx, + TDB_DATA key, + struct notifyd_instance *instance); + +static void notifyd_trigger_parser(TDB_DATA key, TDB_DATA data, + void *private_data) + +{ + struct notifyd_trigger_state *tstate = private_data; + struct notify_event_msg msg = { .action = tstate->msg->action, + .when = tstate->msg->when }; + struct iovec iov[2]; + size_t path_len = key.dsize; + struct notifyd_instance *instances = NULL; + size_t num_instances = 0; + size_t i; + + if (!notifyd_parse_entry(data.dptr, data.dsize, &instances, + &num_instances)) { + DBG_DEBUG("Could not parse notifyd_entry\n"); + return; + } + + DBG_DEBUG("Found %zu instances for %.*s\n", + num_instances, + (int)key.dsize, + (char *)key.dptr); + + iov[0].iov_base = &msg; + iov[0].iov_len = offsetof(struct notify_event_msg, path); + iov[1].iov_base = tstate->msg->path + path_len + 1; + iov[1].iov_len = strlen((char *)(iov[1].iov_base)) + 1; + + for (i=0; i<num_instances; i++) { + struct notifyd_instance *instance = &instances[i]; + struct server_id_buf idbuf; + uint32_t i_filter; + NTSTATUS status; + + if (tstate->covered_by_sys_notify) { + if (tstate->recursive) { + i_filter = instance->internal_subdir_filter; + } else { + i_filter = instance->internal_filter; + } + } else { + if (tstate->recursive) { + i_filter = instance->instance.subdir_filter; + } else { + i_filter = instance->instance.filter; + } + } + + if ((i_filter & tstate->msg->filter) == 0) { + continue; + } + + msg.private_data = instance->instance.private_data; + + status = messaging_send_iov( + tstate->msg_ctx, instance->client, + MSG_PVFS_NOTIFY, iov, ARRAY_SIZE(iov), NULL, 0); + + DBG_DEBUG("messaging_send_iov to %s returned %s\n", + server_id_str_buf(instance->client, &idbuf), + nt_errstr(status)); + + if (NT_STATUS_EQUAL(status, NT_STATUS_OBJECT_NAME_NOT_FOUND) && + procid_is_local(&instance->client)) { + /* + * That process has died + */ + notifyd_send_delete(tstate->msg_ctx, key, instance); + continue; + } + + if (!NT_STATUS_IS_OK(status)) { + DBG_WARNING("messaging_send_iov returned %s\n", + nt_errstr(status)); + } + } +} + +/* + * Send a delete request to ourselves to properly discard a notify + * record for an smbd that has died. + */ + +static void notifyd_send_delete(struct messaging_context *msg_ctx, + TDB_DATA key, + struct notifyd_instance *instance) +{ + struct notify_rec_change_msg msg = { + .instance.private_data = instance->instance.private_data + }; + uint8_t nul = 0; + struct iovec iov[3]; + int ret; + + /* + * Send a rec_change to ourselves to delete a dead entry + */ + + iov[0] = (struct iovec) { + .iov_base = &msg, + .iov_len = offsetof(struct notify_rec_change_msg, path) }; + iov[1] = (struct iovec) { .iov_base = key.dptr, .iov_len = key.dsize }; + iov[2] = (struct iovec) { .iov_base = &nul, .iov_len = sizeof(nul) }; + + ret = messaging_send_iov_from( + msg_ctx, instance->client, messaging_server_id(msg_ctx), + MSG_SMB_NOTIFY_REC_CHANGE, iov, ARRAY_SIZE(iov), NULL, 0); + + if (ret != 0) { + DBG_WARNING("messaging_send_iov_from returned %s\n", + strerror(ret)); + } +} + +static void notifyd_get_db(struct messaging_context *msg_ctx, + void *private_data, uint32_t msg_type, + struct server_id src, DATA_BLOB *data) +{ + struct notifyd_state *state = talloc_get_type_abort( + private_data, struct notifyd_state); + struct server_id_buf id1, id2; + NTSTATUS status; + uint64_t rec_index = UINT64_MAX; + uint8_t index_buf[sizeof(uint64_t)]; + size_t dbsize; + uint8_t *buf; + struct iovec iov[2]; + + dbsize = dbwrap_marshall(state->entries, NULL, 0); + + buf = talloc_array(talloc_tos(), uint8_t, dbsize); + if (buf == NULL) { + DBG_WARNING("talloc_array(%zu) failed\n", dbsize); + return; + } + + dbsize = dbwrap_marshall(state->entries, buf, dbsize); + + if (dbsize != talloc_get_size(buf)) { + DBG_DEBUG("dbsize changed: %zu->%zu\n", + talloc_get_size(buf), + dbsize); + TALLOC_FREE(buf); + return; + } + + if (state->log != NULL) { + rec_index = state->log->rec_index; + } + SBVAL(index_buf, 0, rec_index); + + iov[0] = (struct iovec) { .iov_base = index_buf, + .iov_len = sizeof(index_buf) }; + iov[1] = (struct iovec) { .iov_base = buf, + .iov_len = dbsize }; + + DBG_DEBUG("Sending %zu bytes to %s->%s\n", + iov_buflen(iov, ARRAY_SIZE(iov)), + server_id_str_buf(messaging_server_id(msg_ctx), &id1), + server_id_str_buf(src, &id2)); + + status = messaging_send_iov(msg_ctx, src, MSG_SMB_NOTIFY_DB, + iov, ARRAY_SIZE(iov), NULL, 0); + TALLOC_FREE(buf); + if (!NT_STATUS_IS_OK(status)) { + DBG_WARNING("messaging_send_iov failed: %s\n", + nt_errstr(status)); + } +} + +#ifdef CLUSTER_SUPPORT + +static int notifyd_add_proxy_syswatches(struct db_record *rec, + void *private_data); + +static void notifyd_got_db(struct messaging_context *msg_ctx, + void *private_data, uint32_t msg_type, + struct server_id src, DATA_BLOB *data) +{ + struct notifyd_state *state = talloc_get_type_abort( + private_data, struct notifyd_state); + struct notifyd_peer *p = NULL; + struct server_id_buf idbuf; + NTSTATUS status; + int count; + size_t i; + + for (i=0; i<state->num_peers; i++) { + if (server_id_equal(&src, &state->peers[i]->pid)) { + p = state->peers[i]; + break; + } + } + + if (p == NULL) { + DBG_DEBUG("Did not find peer for db from %s\n", + server_id_str_buf(src, &idbuf)); + return; + } + + if (data->length < 8) { + DBG_DEBUG("Got short db length %zu from %s\n", data->length, + server_id_str_buf(src, &idbuf)); + TALLOC_FREE(p); + return; + } + + p->rec_index = BVAL(data->data, 0); + + p->db = db_open_rbt(p); + if (p->db == NULL) { + DBG_DEBUG("db_open_rbt failed\n"); + TALLOC_FREE(p); + return; + } + + status = dbwrap_unmarshall(p->db, data->data + 8, + data->length - 8); + if (!NT_STATUS_IS_OK(status)) { + DBG_DEBUG("dbwrap_unmarshall returned %s for db %s\n", + nt_errstr(status), + server_id_str_buf(src, &idbuf)); + TALLOC_FREE(p); + return; + } + + dbwrap_traverse_read(p->db, notifyd_add_proxy_syswatches, state, + &count); + + DBG_DEBUG("Database from %s contained %d records\n", + server_id_str_buf(src, &idbuf), + count); +} + +static void notifyd_broadcast_reclog(struct ctdbd_connection *ctdbd_conn, + struct server_id src, + struct messaging_reclog *log) +{ + enum ndr_err_code ndr_err; + uint8_t msghdr[MESSAGE_HDR_LENGTH]; + DATA_BLOB blob; + struct iovec iov[2]; + int ret; + + if (log == NULL) { + return; + } + + DBG_DEBUG("rec_index=%"PRIu64", num_recs=%"PRIu32"\n", + log->rec_index, + log->num_recs); + + message_hdr_put(msghdr, MSG_SMB_NOTIFY_REC_CHANGES, src, + (struct server_id) {0 }); + iov[0] = (struct iovec) { .iov_base = msghdr, + .iov_len = sizeof(msghdr) }; + + ndr_err = ndr_push_struct_blob( + &blob, log, log, + (ndr_push_flags_fn_t)ndr_push_messaging_reclog); + if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) { + DBG_WARNING("ndr_push_messaging_recs failed: %s\n", + ndr_errstr(ndr_err)); + goto done; + } + iov[1] = (struct iovec) { .iov_base = blob.data, + .iov_len = blob.length }; + + ret = ctdbd_messaging_send_iov( + ctdbd_conn, CTDB_BROADCAST_CONNECTED, + CTDB_SRVID_SAMBA_NOTIFY_PROXY, iov, ARRAY_SIZE(iov)); + TALLOC_FREE(blob.data); + if (ret != 0) { + DBG_WARNING("ctdbd_messaging_send failed: %s\n", + strerror(ret)); + goto done; + } + + log->rec_index += 1; + +done: + log->num_recs = 0; + TALLOC_FREE(log->recs); +} + +struct notifyd_broadcast_reclog_state { + struct tevent_context *ev; + struct ctdbd_connection *ctdbd_conn; + struct server_id src; + struct messaging_reclog *log; +}; + +static void notifyd_broadcast_reclog_next(struct tevent_req *subreq); + +static struct tevent_req *notifyd_broadcast_reclog_send( + TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdbd_connection *ctdbd_conn, struct server_id src, + struct messaging_reclog *log) +{ + struct tevent_req *req, *subreq; + struct notifyd_broadcast_reclog_state *state; + + req = tevent_req_create(mem_ctx, &state, + struct notifyd_broadcast_reclog_state); + if (req == NULL) { + return NULL; + } + state->ev = ev; + state->ctdbd_conn = ctdbd_conn; + state->src = src; + state->log = log; + + subreq = tevent_wakeup_send(state, state->ev, + timeval_current_ofs_msec(1000)); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, notifyd_broadcast_reclog_next, req); + return req; +} + +static void notifyd_broadcast_reclog_next(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct notifyd_broadcast_reclog_state *state = tevent_req_data( + req, struct notifyd_broadcast_reclog_state); + bool ok; + + ok = tevent_wakeup_recv(subreq); + TALLOC_FREE(subreq); + if (!ok) { + tevent_req_oom(req); + return; + } + + notifyd_broadcast_reclog(state->ctdbd_conn, state->src, state->log); + + subreq = tevent_wakeup_send(state, state->ev, + timeval_current_ofs_msec(1000)); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, notifyd_broadcast_reclog_next, req); +} + +static int notifyd_broadcast_reclog_recv(struct tevent_req *req) +{ + return tevent_req_simple_recv_unix(req); +} + +struct notifyd_clean_peers_state { + struct tevent_context *ev; + struct notifyd_state *notifyd; +}; + +static void notifyd_clean_peers_next(struct tevent_req *subreq); + +static struct tevent_req *notifyd_clean_peers_send( + TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct notifyd_state *notifyd) +{ + struct tevent_req *req, *subreq; + struct notifyd_clean_peers_state *state; + + req = tevent_req_create(mem_ctx, &state, + struct notifyd_clean_peers_state); + if (req == NULL) { + return NULL; + } + state->ev = ev; + state->notifyd = notifyd; + + subreq = tevent_wakeup_send(state, state->ev, + timeval_current_ofs_msec(30000)); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, notifyd_clean_peers_next, req); + return req; +} + +static void notifyd_clean_peers_next(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct notifyd_clean_peers_state *state = tevent_req_data( + req, struct notifyd_clean_peers_state); + struct notifyd_state *notifyd = state->notifyd; + size_t i; + bool ok; + time_t now = time(NULL); + + ok = tevent_wakeup_recv(subreq); + TALLOC_FREE(subreq); + if (!ok) { + tevent_req_oom(req); + return; + } + + i = 0; + while (i < notifyd->num_peers) { + struct notifyd_peer *p = notifyd->peers[i]; + + if ((now - p->last_broadcast) > 60) { + struct server_id_buf idbuf; + + /* + * Haven't heard for more than 60 seconds. Call this + * peer dead + */ + + DBG_DEBUG("peer %s died\n", + server_id_str_buf(p->pid, &idbuf)); + /* + * This implicitly decrements notifyd->num_peers + */ + TALLOC_FREE(p); + } else { + i += 1; + } + } + + subreq = tevent_wakeup_send(state, state->ev, + timeval_current_ofs_msec(30000)); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, notifyd_clean_peers_next, req); +} + +static int notifyd_clean_peers_recv(struct tevent_req *req) +{ + return tevent_req_simple_recv_unix(req); +} + +static int notifyd_add_proxy_syswatches(struct db_record *rec, + void *private_data) +{ + struct notifyd_state *state = talloc_get_type_abort( + private_data, struct notifyd_state); + struct db_context *db = dbwrap_record_get_db(rec); + TDB_DATA key = dbwrap_record_get_key(rec); + TDB_DATA value = dbwrap_record_get_value(rec); + struct notifyd_instance *instances = NULL; + size_t num_instances = 0; + size_t i; + char path[key.dsize+1]; + bool ok; + + memcpy(path, key.dptr, key.dsize); + path[key.dsize] = '\0'; + + ok = notifyd_parse_entry(value.dptr, value.dsize, &instances, + &num_instances); + if (!ok) { + DBG_WARNING("Could not parse notifyd entry for %s\n", path); + return 0; + } + + for (i=0; i<num_instances; i++) { + struct notifyd_instance *instance = &instances[i]; + uint32_t filter = instance->instance.filter; + uint32_t subdir_filter = instance->instance.subdir_filter; + int ret; + + /* + * This is a remote database. Pointers that we were + * given don't make sense locally. Initialize to NULL + * in case sys_notify_watch fails. + */ + instances[i].sys_watch = NULL; + + ret = state->sys_notify_watch( + db, state->sys_notify_ctx, path, + &filter, &subdir_filter, + notifyd_sys_callback, state->msg_ctx, + &instance->sys_watch); + if (ret != 0) { + DBG_WARNING("inotify_watch returned %s\n", + strerror(errno)); + } + } + + return 0; +} + +static int notifyd_db_del_syswatches(struct db_record *rec, void *private_data) +{ + TDB_DATA key = dbwrap_record_get_key(rec); + TDB_DATA value = dbwrap_record_get_value(rec); + struct notifyd_instance *instances = NULL; + size_t num_instances = 0; + size_t i; + bool ok; + + ok = notifyd_parse_entry(value.dptr, value.dsize, &instances, + &num_instances); + if (!ok) { + DBG_WARNING("Could not parse notifyd entry for %.*s\n", + (int)key.dsize, (char *)key.dptr); + return 0; + } + for (i=0; i<num_instances; i++) { + TALLOC_FREE(instances[i].sys_watch); + } + return 0; +} + +static int notifyd_peer_destructor(struct notifyd_peer *p) +{ + struct notifyd_state *state = p->state; + size_t i; + + if (p->db != NULL) { + dbwrap_traverse_read(p->db, notifyd_db_del_syswatches, + NULL, NULL); + } + + for (i = 0; i<state->num_peers; i++) { + if (p == state->peers[i]) { + state->peers[i] = state->peers[state->num_peers-1]; + state->num_peers -= 1; + break; + } + } + return 0; +} + +static struct notifyd_peer *notifyd_peer_new( + struct notifyd_state *state, struct server_id pid) +{ + struct notifyd_peer *p, **tmp; + + tmp = talloc_realloc(state, state->peers, struct notifyd_peer *, + state->num_peers+1); + if (tmp == NULL) { + return NULL; + } + state->peers = tmp; + + p = talloc_zero(state->peers, struct notifyd_peer); + if (p == NULL) { + return NULL; + } + p->state = state; + p->pid = pid; + + state->peers[state->num_peers] = p; + state->num_peers += 1; + + talloc_set_destructor(p, notifyd_peer_destructor); + + return p; +} + +static void notifyd_apply_reclog(struct notifyd_peer *peer, + const uint8_t *msg, size_t msglen) +{ + struct notifyd_state *state = peer->state; + DATA_BLOB blob = { .data = discard_const_p(uint8_t, msg), + .length = msglen }; + struct server_id_buf idbuf; + struct messaging_reclog *log; + enum ndr_err_code ndr_err; + uint32_t i; + + if (peer->db == NULL) { + /* + * No db yet + */ + return; + } + + log = talloc(peer, struct messaging_reclog); + if (log == NULL) { + DBG_DEBUG("talloc failed\n"); + return; + } + + ndr_err = ndr_pull_struct_blob_all( + &blob, log, log, + (ndr_pull_flags_fn_t)ndr_pull_messaging_reclog); + if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) { + DBG_DEBUG("ndr_pull_messaging_reclog failed: %s\n", + ndr_errstr(ndr_err)); + goto fail; + } + + DBG_DEBUG("Got %"PRIu32" recs index %"PRIu64" from %s\n", + log->num_recs, + log->rec_index, + server_id_str_buf(peer->pid, &idbuf)); + + if (log->rec_index != peer->rec_index) { + DBG_INFO("Got rec index %"PRIu64" from %s, " + "expected %"PRIu64"\n", + log->rec_index, + server_id_str_buf(peer->pid, &idbuf), + peer->rec_index); + goto fail; + } + + for (i=0; i<log->num_recs; i++) { + struct messaging_rec *r = log->recs[i]; + struct notify_rec_change_msg *chg; + size_t pathlen; + bool ok; + struct notify_instance instance; + + ok = notifyd_parse_rec_change(r->buf.data, r->buf.length, + &chg, &pathlen); + if (!ok) { + DBG_INFO("notifyd_parse_rec_change failed\n"); + goto fail; + } + + /* avoid SIGBUS */ + memcpy(&instance, &chg->instance, sizeof(instance)); + + ok = notifyd_apply_rec_change(&r->src, chg->path, pathlen, + &instance, peer->db, + state->sys_notify_watch, + state->sys_notify_ctx, + state->msg_ctx); + if (!ok) { + DBG_INFO("notifyd_apply_rec_change failed\n"); + goto fail; + } + } + + peer->rec_index += 1; + peer->last_broadcast = time(NULL); + + TALLOC_FREE(log); + return; + +fail: + DBG_DEBUG("Dropping peer %s\n", + server_id_str_buf(peer->pid, &idbuf)); + TALLOC_FREE(peer); +} + +/* + * Receive messaging_reclog (log of MSG_SMB_NOTIFY_REC_CHANGE + * messages) broadcasts by other notifyds. Several cases: + * + * We don't know the source. This creates a new peer. Creating a peer + * involves asking the peer for its full database. We assume ordered + * messages, so the new database will arrive before the next broadcast + * will. + * + * We know the source and the log index matches. We will apply the log + * locally to our peer's db as if we had received it from a local + * client. + * + * We know the source but the log index does not match. This means we + * lost a message. We just drop the whole peer and wait for the next + * broadcast, which will then trigger a fresh database pull. + */ + +static int notifyd_snoop_broadcast(struct tevent_context *ev, + uint32_t src_vnn, uint32_t dst_vnn, + uint64_t dst_srvid, + const uint8_t *msg, size_t msglen, + void *private_data) +{ + struct notifyd_state *state = talloc_get_type_abort( + private_data, struct notifyd_state); + struct server_id my_id = messaging_server_id(state->msg_ctx); + struct notifyd_peer *p; + uint32_t i; + uint32_t msg_type; + struct server_id src, dst; + struct server_id_buf idbuf; + NTSTATUS status; + + if (msglen < MESSAGE_HDR_LENGTH) { + DBG_DEBUG("Got short broadcast\n"); + return 0; + } + message_hdr_get(&msg_type, &src, &dst, msg); + + if (msg_type != MSG_SMB_NOTIFY_REC_CHANGES) { + DBG_DEBUG("Got message %"PRIu32", ignoring\n", msg_type); + return 0; + } + if (server_id_equal(&src, &my_id)) { + DBG_DEBUG("Ignoring my own broadcast\n"); + return 0; + } + + DBG_DEBUG("Got MSG_SMB_NOTIFY_REC_CHANGES from %s\n", + server_id_str_buf(src, &idbuf)); + + for (i=0; i<state->num_peers; i++) { + if (server_id_equal(&state->peers[i]->pid, &src)) { + + DBG_DEBUG("Applying changes to peer %"PRIu32"\n", i); + + notifyd_apply_reclog(state->peers[i], + msg + MESSAGE_HDR_LENGTH, + msglen - MESSAGE_HDR_LENGTH); + return 0; + } + } + + DBG_DEBUG("Creating new peer for %s\n", + server_id_str_buf(src, &idbuf)); + + p = notifyd_peer_new(state, src); + if (p == NULL) { + DBG_DEBUG("notifyd_peer_new failed\n"); + return 0; + } + + status = messaging_send_buf(state->msg_ctx, src, MSG_SMB_NOTIFY_GET_DB, + NULL, 0); + if (!NT_STATUS_IS_OK(status)) { + DBG_DEBUG("messaging_send_buf failed: %s\n", + nt_errstr(status)); + TALLOC_FREE(p); + return 0; + } + + return 0; +} +#endif |