/* * Unix SMB/CIFS implementation. * * Copyright (C) Volker Lendecke 2014 * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #include "replace.h" #include #include "notifyd_private.h" #include "lib/util/server_id.h" #include "lib/util/data_blob.h" #include "librpc/gen_ndr/notify.h" #include "librpc/gen_ndr/messaging.h" #include "librpc/gen_ndr/server_id.h" #include "lib/dbwrap/dbwrap.h" #include "lib/dbwrap/dbwrap_rbt.h" #include "messages.h" #include "tdb.h" #include "util_tdb.h" #include "notifyd.h" #include "lib/util/server_id_db.h" #include "lib/util/tevent_unix.h" #include "lib/util/tevent_ntstatus.h" #include "ctdbd_conn.h" #include "ctdb_srvids.h" #include "server_id_db_util.h" #include "lib/util/iov_buf.h" #include "messages_util.h" #ifdef CLUSTER_SUPPORT #include "ctdb_protocol.h" #endif struct notifyd_peer; /* * All of notifyd's state */ struct notifyd_state { struct tevent_context *ev; struct messaging_context *msg_ctx; struct ctdbd_connection *ctdbd_conn; /* * Database of everything clients show interest in. Indexed by * absolute path. The database keys are not 0-terminated * to allow the critical operation, notifyd_trigger, to walk * the structure from the top without adding intermediate 0s. * The database records contain an array of * * struct notifyd_instance * * to be maintained and parsed by notifyd_parse_entry() */ struct db_context *entries; /* * In the cluster case, this is the place where we store a log * of all MSG_SMB_NOTIFY_REC_CHANGE messages. We just 1:1 * forward them to our peer notifyd's in the cluster once a * second or when the log grows too large. */ struct messaging_reclog *log; /* * Array of companion notifyd's in a cluster. Every notifyd * broadcasts its messaging_reclog to every other notifyd in * the cluster. This is done by making ctdb send a message to * srvid CTDB_SRVID_SAMBA_NOTIFY_PROXY with destination node * number CTDB_BROADCAST_CONNECTED. Everybody in the cluster who * had called register_with_ctdbd this srvid will receive the * broadcasts. * * Database replication happens via these broadcasts. Also, * they serve as liveness indication. If a notifyd receives a * broadcast from an unknown peer, it will create one for this * srvid. Also when we don't hear anything from a peer for a * while, we will discard it. */ struct notifyd_peer **peers; size_t num_peers; sys_notify_watch_fn sys_notify_watch; struct sys_notify_context *sys_notify_ctx; }; struct notifyd_peer { struct notifyd_state *state; struct server_id pid; uint64_t rec_index; struct db_context *db; time_t last_broadcast; }; static void notifyd_rec_change(struct messaging_context *msg_ctx, void *private_data, uint32_t msg_type, struct server_id src, DATA_BLOB *data); static void notifyd_trigger(struct messaging_context *msg_ctx, void *private_data, uint32_t msg_type, struct server_id src, DATA_BLOB *data); static void notifyd_get_db(struct messaging_context *msg_ctx, void *private_data, uint32_t msg_type, struct server_id src, DATA_BLOB *data); #ifdef CLUSTER_SUPPORT static void notifyd_got_db(struct messaging_context *msg_ctx, void *private_data, uint32_t msg_type, struct server_id src, DATA_BLOB *data); static void notifyd_broadcast_reclog(struct ctdbd_connection *ctdbd_conn, struct server_id src, struct messaging_reclog *log); #endif static void notifyd_sys_callback(struct sys_notify_context *ctx, void *private_data, struct notify_event *ev, uint32_t filter); #ifdef CLUSTER_SUPPORT static struct tevent_req *notifyd_broadcast_reclog_send( TALLOC_CTX *mem_ctx, struct tevent_context *ev, struct ctdbd_connection *ctdbd_conn, struct server_id src, struct messaging_reclog *log); static int notifyd_broadcast_reclog_recv(struct tevent_req *req); static struct tevent_req *notifyd_clean_peers_send( TALLOC_CTX *mem_ctx, struct tevent_context *ev, struct notifyd_state *notifyd); static int notifyd_clean_peers_recv(struct tevent_req *req); #endif static int sys_notify_watch_dummy( TALLOC_CTX *mem_ctx, struct sys_notify_context *ctx, const char *path, uint32_t *filter, uint32_t *subdir_filter, void (*callback)(struct sys_notify_context *ctx, void *private_data, struct notify_event *ev, uint32_t filter), void *private_data, void *handle_p) { void **handle = handle_p; *handle = NULL; return 0; } #ifdef CLUSTER_SUPPORT static void notifyd_broadcast_reclog_finished(struct tevent_req *subreq); static void notifyd_clean_peers_finished(struct tevent_req *subreq); static int notifyd_snoop_broadcast(struct tevent_context *ev, uint32_t src_vnn, uint32_t dst_vnn, uint64_t dst_srvid, const uint8_t *msg, size_t msglen, void *private_data); #endif struct tevent_req *notifyd_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev, struct messaging_context *msg_ctx, struct ctdbd_connection *ctdbd_conn, sys_notify_watch_fn sys_notify_watch, struct sys_notify_context *sys_notify_ctx) { struct tevent_req *req; #ifdef CLUSTER_SUPPORT struct tevent_req *subreq; #endif struct notifyd_state *state; struct server_id_db *names_db; NTSTATUS status; int ret; req = tevent_req_create(mem_ctx, &state, struct notifyd_state); if (req == NULL) { return NULL; } state->ev = ev; state->msg_ctx = msg_ctx; state->ctdbd_conn = ctdbd_conn; if (sys_notify_watch == NULL) { sys_notify_watch = sys_notify_watch_dummy; } state->sys_notify_watch = sys_notify_watch; state->sys_notify_ctx = sys_notify_ctx; state->entries = db_open_rbt(state); if (tevent_req_nomem(state->entries, req)) { return tevent_req_post(req, ev); } status = messaging_register(msg_ctx, state, MSG_SMB_NOTIFY_REC_CHANGE, notifyd_rec_change); if (tevent_req_nterror(req, status)) { return tevent_req_post(req, ev); } status = messaging_register(msg_ctx, state, MSG_SMB_NOTIFY_TRIGGER, notifyd_trigger); if (tevent_req_nterror(req, status)) { goto deregister_rec_change; } status = messaging_register(msg_ctx, state, MSG_SMB_NOTIFY_GET_DB, notifyd_get_db); if (tevent_req_nterror(req, status)) { goto deregister_trigger; } names_db = messaging_names_db(msg_ctx); ret = server_id_db_set_exclusive(names_db, "notify-daemon"); if (ret != 0) { DBG_DEBUG("server_id_db_add failed: %s\n", strerror(ret)); tevent_req_error(req, ret); goto deregister_get_db; } if (ctdbd_conn == NULL) { /* * No cluster around, skip the database replication * engine */ return req; } #ifdef CLUSTER_SUPPORT status = messaging_register(msg_ctx, state, MSG_SMB_NOTIFY_DB, notifyd_got_db); if (tevent_req_nterror(req, status)) { goto deregister_get_db; } state->log = talloc_zero(state, struct messaging_reclog); if (tevent_req_nomem(state->log, req)) { goto deregister_db; } subreq = notifyd_broadcast_reclog_send( state->log, ev, ctdbd_conn, messaging_server_id(msg_ctx), state->log); if (tevent_req_nomem(subreq, req)) { goto deregister_db; } tevent_req_set_callback(subreq, notifyd_broadcast_reclog_finished, req); subreq = notifyd_clean_peers_send(state, ev, state); if (tevent_req_nomem(subreq, req)) { goto deregister_db; } tevent_req_set_callback(subreq, notifyd_clean_peers_finished, req); ret = register_with_ctdbd(ctdbd_conn, CTDB_SRVID_SAMBA_NOTIFY_PROXY, notifyd_snoop_broadcast, state); if (ret != 0) { tevent_req_error(req, ret); goto deregister_db; } #endif return req; #ifdef CLUSTER_SUPPORT deregister_db: messaging_deregister(msg_ctx, MSG_SMB_NOTIFY_DB, state); #endif deregister_get_db: messaging_deregister(msg_ctx, MSG_SMB_NOTIFY_GET_DB, state); deregister_trigger: messaging_deregister(msg_ctx, MSG_SMB_NOTIFY_TRIGGER, state); deregister_rec_change: messaging_deregister(msg_ctx, MSG_SMB_NOTIFY_REC_CHANGE, state); return tevent_req_post(req, ev); } #ifdef CLUSTER_SUPPORT static void notifyd_broadcast_reclog_finished(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); int ret; ret = notifyd_broadcast_reclog_recv(subreq); TALLOC_FREE(subreq); tevent_req_error(req, ret); } static void notifyd_clean_peers_finished(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); int ret; ret = notifyd_clean_peers_recv(subreq); TALLOC_FREE(subreq); tevent_req_error(req, ret); } #endif int notifyd_recv(struct tevent_req *req) { return tevent_req_simple_recv_unix(req); } static bool notifyd_apply_rec_change( const struct server_id *client, const char *path, size_t pathlen, const struct notify_instance *chg, struct db_context *entries, sys_notify_watch_fn sys_notify_watch, struct sys_notify_context *sys_notify_ctx, struct messaging_context *msg_ctx) { struct db_record *rec = NULL; struct notifyd_instance *instances = NULL; size_t num_instances; size_t i; struct notifyd_instance *instance = NULL; TDB_DATA value; NTSTATUS status; bool ok = false; if (pathlen == 0) { DBG_WARNING("pathlen==0\n"); return false; } if (path[pathlen-1] != '\0') { DBG_WARNING("path not 0-terminated\n"); return false; } DBG_DEBUG("path=%s, filter=%"PRIu32", subdir_filter=%"PRIu32", " "private_data=%p\n", path, chg->filter, chg->subdir_filter, chg->private_data); rec = dbwrap_fetch_locked( entries, entries, make_tdb_data((const uint8_t *)path, pathlen-1)); if (rec == NULL) { DBG_WARNING("dbwrap_fetch_locked failed\n"); goto fail; } num_instances = 0; value = dbwrap_record_get_value(rec); if (value.dsize != 0) { if (!notifyd_parse_entry(value.dptr, value.dsize, NULL, &num_instances)) { goto fail; } } /* * Overallocate by one instance to avoid a realloc when adding */ instances = talloc_array(rec, struct notifyd_instance, num_instances + 1); if (instances == NULL) { DBG_WARNING("talloc failed\n"); goto fail; } if (value.dsize != 0) { memcpy(instances, value.dptr, value.dsize); } for (i=0; iclient, client) && (instance->instance.private_data == chg->private_data)) { break; } } if (i < num_instances) { instance->instance = *chg; } else { /* * We've overallocated for one instance */ instance = &instances[num_instances]; *instance = (struct notifyd_instance) { .client = *client, .instance = *chg, .internal_filter = chg->filter, .internal_subdir_filter = chg->subdir_filter }; num_instances += 1; } if ((instance->instance.filter != 0) || (instance->instance.subdir_filter != 0)) { int ret; TALLOC_FREE(instance->sys_watch); ret = sys_notify_watch(entries, sys_notify_ctx, path, &instance->internal_filter, &instance->internal_subdir_filter, notifyd_sys_callback, msg_ctx, &instance->sys_watch); if (ret != 0) { DBG_WARNING("sys_notify_watch for [%s] returned %s\n", path, strerror(errno)); } } if ((instance->instance.filter == 0) && (instance->instance.subdir_filter == 0)) { /* This is a delete request */ TALLOC_FREE(instance->sys_watch); *instance = instances[num_instances-1]; num_instances -= 1; } DBG_DEBUG("%s has %zu instances\n", path, num_instances); if (num_instances == 0) { status = dbwrap_record_delete(rec); if (!NT_STATUS_IS_OK(status)) { DBG_WARNING("dbwrap_record_delete returned %s\n", nt_errstr(status)); goto fail; } } else { value = make_tdb_data( (uint8_t *)instances, sizeof(struct notifyd_instance) * num_instances); status = dbwrap_record_store(rec, value, 0); if (!NT_STATUS_IS_OK(status)) { DBG_WARNING("dbwrap_record_store returned %s\n", nt_errstr(status)); goto fail; } } ok = true; fail: TALLOC_FREE(rec); return ok; } static void notifyd_sys_callback(struct sys_notify_context *ctx, void *private_data, struct notify_event *ev, uint32_t filter) { struct messaging_context *msg_ctx = talloc_get_type_abort( private_data, struct messaging_context); struct notify_trigger_msg msg; struct iovec iov[4]; char slash = '/'; msg = (struct notify_trigger_msg) { .when = timespec_current(), .action = ev->action, .filter = filter, }; iov[0].iov_base = &msg; iov[0].iov_len = offsetof(struct notify_trigger_msg, path); iov[1].iov_base = discard_const_p(char, ev->dir); iov[1].iov_len = strlen(ev->dir); iov[2].iov_base = &slash; iov[2].iov_len = 1; iov[3].iov_base = discard_const_p(char, ev->path); iov[3].iov_len = strlen(ev->path)+1; messaging_send_iov( msg_ctx, messaging_server_id(msg_ctx), MSG_SMB_NOTIFY_TRIGGER, iov, ARRAY_SIZE(iov), NULL, 0); } static bool notifyd_parse_rec_change(uint8_t *buf, size_t bufsize, struct notify_rec_change_msg **pmsg, size_t *pathlen) { struct notify_rec_change_msg *msg; if (bufsize < offsetof(struct notify_rec_change_msg, path) + 1) { DBG_WARNING("message too short, ignoring: %zu\n", bufsize); return false; } *pmsg = msg = (struct notify_rec_change_msg *)buf; *pathlen = bufsize - offsetof(struct notify_rec_change_msg, path); DBG_DEBUG("Got rec_change_msg filter=%"PRIu32", " "subdir_filter=%"PRIu32", private_data=%p, path=%.*s\n", msg->instance.filter, msg->instance.subdir_filter, msg->instance.private_data, (int)(*pathlen), msg->path); return true; } static void notifyd_rec_change(struct messaging_context *msg_ctx, void *private_data, uint32_t msg_type, struct server_id src, DATA_BLOB *data) { struct notifyd_state *state = talloc_get_type_abort( private_data, struct notifyd_state); struct server_id_buf idbuf; struct notify_rec_change_msg *msg; size_t pathlen; bool ok; struct notify_instance instance; DBG_DEBUG("Got %zu bytes from %s\n", data->length, server_id_str_buf(src, &idbuf)); ok = notifyd_parse_rec_change(data->data, data->length, &msg, &pathlen); if (!ok) { return; } memcpy(&instance, &msg->instance, sizeof(instance)); /* avoid SIGBUS */ ok = notifyd_apply_rec_change( &src, msg->path, pathlen, &instance, state->entries, state->sys_notify_watch, state->sys_notify_ctx, state->msg_ctx); if (!ok) { DBG_DEBUG("notifyd_apply_rec_change failed, ignoring\n"); return; } if ((state->log == NULL) || (state->ctdbd_conn == NULL)) { return; } #ifdef CLUSTER_SUPPORT { struct messaging_rec **tmp; struct messaging_reclog *log; struct iovec iov = { .iov_base = data->data, .iov_len = data->length }; log = state->log; tmp = talloc_realloc(log, log->recs, struct messaging_rec *, log->num_recs+1); if (tmp == NULL) { DBG_WARNING("talloc_realloc failed, ignoring\n"); return; } log->recs = tmp; log->recs[log->num_recs] = messaging_rec_create( log->recs, src, messaging_server_id(msg_ctx), msg_type, &iov, 1, NULL, 0); if (log->recs[log->num_recs] == NULL) { DBG_WARNING("messaging_rec_create failed, ignoring\n"); return; } log->num_recs += 1; if (log->num_recs >= 100) { /* * Don't let the log grow too large */ notifyd_broadcast_reclog(state->ctdbd_conn, messaging_server_id(msg_ctx), log); } } #endif } struct notifyd_trigger_state { struct messaging_context *msg_ctx; struct notify_trigger_msg *msg; bool recursive; bool covered_by_sys_notify; }; static void notifyd_trigger_parser(TDB_DATA key, TDB_DATA data, void *private_data); static void notifyd_trigger(struct messaging_context *msg_ctx, void *private_data, uint32_t msg_type, struct server_id src, DATA_BLOB *data) { struct notifyd_state *state = talloc_get_type_abort( private_data, struct notifyd_state); struct server_id my_id = messaging_server_id(msg_ctx); struct notifyd_trigger_state tstate; const char *path; const char *p, *next_p; if (data->length < offsetof(struct notify_trigger_msg, path) + 1) { DBG_WARNING("message too short, ignoring: %zu\n", data->length); return; } if (data->data[data->length-1] != 0) { DBG_WARNING("path not 0-terminated, ignoring\n");; return; } tstate.msg_ctx = msg_ctx; tstate.covered_by_sys_notify = (src.vnn == my_id.vnn); tstate.covered_by_sys_notify &= !server_id_equal(&src, &my_id); tstate.msg = (struct notify_trigger_msg *)data->data; path = tstate.msg->path; DBG_DEBUG("Got trigger_msg action=%"PRIu32", filter=%"PRIu32", " "path=%s\n", tstate.msg->action, tstate.msg->filter, path); if (path[0] != '/') { DBG_WARNING("path %s does not start with /, ignoring\n", path); return; } for (p = strchr(path+1, '/'); p != NULL; p = next_p) { ptrdiff_t path_len = p - path; TDB_DATA key; uint32_t i; next_p = strchr(p+1, '/'); tstate.recursive = (next_p != NULL); DBG_DEBUG("Trying path %.*s\n", (int)path_len, path); key = (TDB_DATA) { .dptr = discard_const_p(uint8_t, path), .dsize = path_len }; dbwrap_parse_record(state->entries, key, notifyd_trigger_parser, &tstate); if (state->peers == NULL) { continue; } if (src.vnn != my_id.vnn) { continue; } for (i=0; inum_peers; i++) { if (state->peers[i]->db == NULL) { /* * Inactive peer, did not get a db yet */ continue; } dbwrap_parse_record(state->peers[i]->db, key, notifyd_trigger_parser, &tstate); } } } static void notifyd_send_delete(struct messaging_context *msg_ctx, TDB_DATA key, struct notifyd_instance *instance); static void notifyd_trigger_parser(TDB_DATA key, TDB_DATA data, void *private_data) { struct notifyd_trigger_state *tstate = private_data; struct notify_event_msg msg = { .action = tstate->msg->action, .when = tstate->msg->when }; struct iovec iov[2]; size_t path_len = key.dsize; struct notifyd_instance *instances = NULL; size_t num_instances = 0; size_t i; if (!notifyd_parse_entry(data.dptr, data.dsize, &instances, &num_instances)) { DBG_DEBUG("Could not parse notifyd_entry\n"); return; } DBG_DEBUG("Found %zu instances for %.*s\n", num_instances, (int)key.dsize, (char *)key.dptr); iov[0].iov_base = &msg; iov[0].iov_len = offsetof(struct notify_event_msg, path); iov[1].iov_base = tstate->msg->path + path_len + 1; iov[1].iov_len = strlen((char *)(iov[1].iov_base)) + 1; for (i=0; icovered_by_sys_notify) { if (tstate->recursive) { i_filter = instance->internal_subdir_filter; } else { i_filter = instance->internal_filter; } } else { if (tstate->recursive) { i_filter = instance->instance.subdir_filter; } else { i_filter = instance->instance.filter; } } if ((i_filter & tstate->msg->filter) == 0) { continue; } msg.private_data = instance->instance.private_data; status = messaging_send_iov( tstate->msg_ctx, instance->client, MSG_PVFS_NOTIFY, iov, ARRAY_SIZE(iov), NULL, 0); DBG_DEBUG("messaging_send_iov to %s returned %s\n", server_id_str_buf(instance->client, &idbuf), nt_errstr(status)); if (NT_STATUS_EQUAL(status, NT_STATUS_OBJECT_NAME_NOT_FOUND) && procid_is_local(&instance->client)) { /* * That process has died */ notifyd_send_delete(tstate->msg_ctx, key, instance); continue; } if (!NT_STATUS_IS_OK(status)) { DBG_WARNING("messaging_send_iov returned %s\n", nt_errstr(status)); } } } /* * Send a delete request to ourselves to properly discard a notify * record for an smbd that has died. */ static void notifyd_send_delete(struct messaging_context *msg_ctx, TDB_DATA key, struct notifyd_instance *instance) { struct notify_rec_change_msg msg = { .instance.private_data = instance->instance.private_data }; uint8_t nul = 0; struct iovec iov[3]; int ret; /* * Send a rec_change to ourselves to delete a dead entry */ iov[0] = (struct iovec) { .iov_base = &msg, .iov_len = offsetof(struct notify_rec_change_msg, path) }; iov[1] = (struct iovec) { .iov_base = key.dptr, .iov_len = key.dsize }; iov[2] = (struct iovec) { .iov_base = &nul, .iov_len = sizeof(nul) }; ret = messaging_send_iov_from( msg_ctx, instance->client, messaging_server_id(msg_ctx), MSG_SMB_NOTIFY_REC_CHANGE, iov, ARRAY_SIZE(iov), NULL, 0); if (ret != 0) { DBG_WARNING("messaging_send_iov_from returned %s\n", strerror(ret)); } } static void notifyd_get_db(struct messaging_context *msg_ctx, void *private_data, uint32_t msg_type, struct server_id src, DATA_BLOB *data) { struct notifyd_state *state = talloc_get_type_abort( private_data, struct notifyd_state); struct server_id_buf id1, id2; NTSTATUS status; uint64_t rec_index = UINT64_MAX; uint8_t index_buf[sizeof(uint64_t)]; size_t dbsize; uint8_t *buf; struct iovec iov[2]; dbsize = dbwrap_marshall(state->entries, NULL, 0); buf = talloc_array(talloc_tos(), uint8_t, dbsize); if (buf == NULL) { DBG_WARNING("talloc_array(%zu) failed\n", dbsize); return; } dbsize = dbwrap_marshall(state->entries, buf, dbsize); if (dbsize != talloc_get_size(buf)) { DBG_DEBUG("dbsize changed: %zu->%zu\n", talloc_get_size(buf), dbsize); TALLOC_FREE(buf); return; } if (state->log != NULL) { rec_index = state->log->rec_index; } SBVAL(index_buf, 0, rec_index); iov[0] = (struct iovec) { .iov_base = index_buf, .iov_len = sizeof(index_buf) }; iov[1] = (struct iovec) { .iov_base = buf, .iov_len = dbsize }; DBG_DEBUG("Sending %zu bytes to %s->%s\n", iov_buflen(iov, ARRAY_SIZE(iov)), server_id_str_buf(messaging_server_id(msg_ctx), &id1), server_id_str_buf(src, &id2)); status = messaging_send_iov(msg_ctx, src, MSG_SMB_NOTIFY_DB, iov, ARRAY_SIZE(iov), NULL, 0); TALLOC_FREE(buf); if (!NT_STATUS_IS_OK(status)) { DBG_WARNING("messaging_send_iov failed: %s\n", nt_errstr(status)); } } #ifdef CLUSTER_SUPPORT static int notifyd_add_proxy_syswatches(struct db_record *rec, void *private_data); static void notifyd_got_db(struct messaging_context *msg_ctx, void *private_data, uint32_t msg_type, struct server_id src, DATA_BLOB *data) { struct notifyd_state *state = talloc_get_type_abort( private_data, struct notifyd_state); struct notifyd_peer *p = NULL; struct server_id_buf idbuf; NTSTATUS status; int count; size_t i; for (i=0; inum_peers; i++) { if (server_id_equal(&src, &state->peers[i]->pid)) { p = state->peers[i]; break; } } if (p == NULL) { DBG_DEBUG("Did not find peer for db from %s\n", server_id_str_buf(src, &idbuf)); return; } if (data->length < 8) { DBG_DEBUG("Got short db length %zu from %s\n", data->length, server_id_str_buf(src, &idbuf)); TALLOC_FREE(p); return; } p->rec_index = BVAL(data->data, 0); p->db = db_open_rbt(p); if (p->db == NULL) { DBG_DEBUG("db_open_rbt failed\n"); TALLOC_FREE(p); return; } status = dbwrap_unmarshall(p->db, data->data + 8, data->length - 8); if (!NT_STATUS_IS_OK(status)) { DBG_DEBUG("dbwrap_unmarshall returned %s for db %s\n", nt_errstr(status), server_id_str_buf(src, &idbuf)); TALLOC_FREE(p); return; } dbwrap_traverse_read(p->db, notifyd_add_proxy_syswatches, state, &count); DBG_DEBUG("Database from %s contained %d records\n", server_id_str_buf(src, &idbuf), count); } static void notifyd_broadcast_reclog(struct ctdbd_connection *ctdbd_conn, struct server_id src, struct messaging_reclog *log) { enum ndr_err_code ndr_err; uint8_t msghdr[MESSAGE_HDR_LENGTH]; DATA_BLOB blob; struct iovec iov[2]; int ret; if (log == NULL) { return; } DBG_DEBUG("rec_index=%"PRIu64", num_recs=%"PRIu32"\n", log->rec_index, log->num_recs); message_hdr_put(msghdr, MSG_SMB_NOTIFY_REC_CHANGES, src, (struct server_id) {0 }); iov[0] = (struct iovec) { .iov_base = msghdr, .iov_len = sizeof(msghdr) }; ndr_err = ndr_push_struct_blob( &blob, log, log, (ndr_push_flags_fn_t)ndr_push_messaging_reclog); if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) { DBG_WARNING("ndr_push_messaging_recs failed: %s\n", ndr_errstr(ndr_err)); goto done; } iov[1] = (struct iovec) { .iov_base = blob.data, .iov_len = blob.length }; ret = ctdbd_messaging_send_iov( ctdbd_conn, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_SAMBA_NOTIFY_PROXY, iov, ARRAY_SIZE(iov)); TALLOC_FREE(blob.data); if (ret != 0) { DBG_WARNING("ctdbd_messaging_send failed: %s\n", strerror(ret)); goto done; } log->rec_index += 1; done: log->num_recs = 0; TALLOC_FREE(log->recs); } struct notifyd_broadcast_reclog_state { struct tevent_context *ev; struct ctdbd_connection *ctdbd_conn; struct server_id src; struct messaging_reclog *log; }; static void notifyd_broadcast_reclog_next(struct tevent_req *subreq); static struct tevent_req *notifyd_broadcast_reclog_send( TALLOC_CTX *mem_ctx, struct tevent_context *ev, struct ctdbd_connection *ctdbd_conn, struct server_id src, struct messaging_reclog *log) { struct tevent_req *req, *subreq; struct notifyd_broadcast_reclog_state *state; req = tevent_req_create(mem_ctx, &state, struct notifyd_broadcast_reclog_state); if (req == NULL) { return NULL; } state->ev = ev; state->ctdbd_conn = ctdbd_conn; state->src = src; state->log = log; subreq = tevent_wakeup_send(state, state->ev, timeval_current_ofs_msec(1000)); if (tevent_req_nomem(subreq, req)) { return tevent_req_post(req, ev); } tevent_req_set_callback(subreq, notifyd_broadcast_reclog_next, req); return req; } static void notifyd_broadcast_reclog_next(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct notifyd_broadcast_reclog_state *state = tevent_req_data( req, struct notifyd_broadcast_reclog_state); bool ok; ok = tevent_wakeup_recv(subreq); TALLOC_FREE(subreq); if (!ok) { tevent_req_oom(req); return; } notifyd_broadcast_reclog(state->ctdbd_conn, state->src, state->log); subreq = tevent_wakeup_send(state, state->ev, timeval_current_ofs_msec(1000)); if (tevent_req_nomem(subreq, req)) { return; } tevent_req_set_callback(subreq, notifyd_broadcast_reclog_next, req); } static int notifyd_broadcast_reclog_recv(struct tevent_req *req) { return tevent_req_simple_recv_unix(req); } struct notifyd_clean_peers_state { struct tevent_context *ev; struct notifyd_state *notifyd; }; static void notifyd_clean_peers_next(struct tevent_req *subreq); static struct tevent_req *notifyd_clean_peers_send( TALLOC_CTX *mem_ctx, struct tevent_context *ev, struct notifyd_state *notifyd) { struct tevent_req *req, *subreq; struct notifyd_clean_peers_state *state; req = tevent_req_create(mem_ctx, &state, struct notifyd_clean_peers_state); if (req == NULL) { return NULL; } state->ev = ev; state->notifyd = notifyd; subreq = tevent_wakeup_send(state, state->ev, timeval_current_ofs_msec(30000)); if (tevent_req_nomem(subreq, req)) { return tevent_req_post(req, ev); } tevent_req_set_callback(subreq, notifyd_clean_peers_next, req); return req; } static void notifyd_clean_peers_next(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct notifyd_clean_peers_state *state = tevent_req_data( req, struct notifyd_clean_peers_state); struct notifyd_state *notifyd = state->notifyd; size_t i; bool ok; time_t now = time(NULL); ok = tevent_wakeup_recv(subreq); TALLOC_FREE(subreq); if (!ok) { tevent_req_oom(req); return; } i = 0; while (i < notifyd->num_peers) { struct notifyd_peer *p = notifyd->peers[i]; if ((now - p->last_broadcast) > 60) { struct server_id_buf idbuf; /* * Haven't heard for more than 60 seconds. Call this * peer dead */ DBG_DEBUG("peer %s died\n", server_id_str_buf(p->pid, &idbuf)); /* * This implicitly decrements notifyd->num_peers */ TALLOC_FREE(p); } else { i += 1; } } subreq = tevent_wakeup_send(state, state->ev, timeval_current_ofs_msec(30000)); if (tevent_req_nomem(subreq, req)) { return; } tevent_req_set_callback(subreq, notifyd_clean_peers_next, req); } static int notifyd_clean_peers_recv(struct tevent_req *req) { return tevent_req_simple_recv_unix(req); } static int notifyd_add_proxy_syswatches(struct db_record *rec, void *private_data) { struct notifyd_state *state = talloc_get_type_abort( private_data, struct notifyd_state); struct db_context *db = dbwrap_record_get_db(rec); TDB_DATA key = dbwrap_record_get_key(rec); TDB_DATA value = dbwrap_record_get_value(rec); struct notifyd_instance *instances = NULL; size_t num_instances = 0; size_t i; char path[key.dsize+1]; bool ok; memcpy(path, key.dptr, key.dsize); path[key.dsize] = '\0'; ok = notifyd_parse_entry(value.dptr, value.dsize, &instances, &num_instances); if (!ok) { DBG_WARNING("Could not parse notifyd entry for %s\n", path); return 0; } for (i=0; iinstance.filter; uint32_t subdir_filter = instance->instance.subdir_filter; int ret; /* * This is a remote database. Pointers that we were * given don't make sense locally. Initialize to NULL * in case sys_notify_watch fails. */ instances[i].sys_watch = NULL; ret = state->sys_notify_watch( db, state->sys_notify_ctx, path, &filter, &subdir_filter, notifyd_sys_callback, state->msg_ctx, &instance->sys_watch); if (ret != 0) { DBG_WARNING("inotify_watch returned %s\n", strerror(errno)); } } return 0; } static int notifyd_db_del_syswatches(struct db_record *rec, void *private_data) { TDB_DATA key = dbwrap_record_get_key(rec); TDB_DATA value = dbwrap_record_get_value(rec); struct notifyd_instance *instances = NULL; size_t num_instances = 0; size_t i; bool ok; ok = notifyd_parse_entry(value.dptr, value.dsize, &instances, &num_instances); if (!ok) { DBG_WARNING("Could not parse notifyd entry for %.*s\n", (int)key.dsize, (char *)key.dptr); return 0; } for (i=0; istate; size_t i; if (p->db != NULL) { dbwrap_traverse_read(p->db, notifyd_db_del_syswatches, NULL, NULL); } for (i = 0; inum_peers; i++) { if (p == state->peers[i]) { state->peers[i] = state->peers[state->num_peers-1]; state->num_peers -= 1; break; } } return 0; } static struct notifyd_peer *notifyd_peer_new( struct notifyd_state *state, struct server_id pid) { struct notifyd_peer *p, **tmp; tmp = talloc_realloc(state, state->peers, struct notifyd_peer *, state->num_peers+1); if (tmp == NULL) { return NULL; } state->peers = tmp; p = talloc_zero(state->peers, struct notifyd_peer); if (p == NULL) { return NULL; } p->state = state; p->pid = pid; state->peers[state->num_peers] = p; state->num_peers += 1; talloc_set_destructor(p, notifyd_peer_destructor); return p; } static void notifyd_apply_reclog(struct notifyd_peer *peer, const uint8_t *msg, size_t msglen) { struct notifyd_state *state = peer->state; DATA_BLOB blob = { .data = discard_const_p(uint8_t, msg), .length = msglen }; struct server_id_buf idbuf; struct messaging_reclog *log; enum ndr_err_code ndr_err; uint32_t i; if (peer->db == NULL) { /* * No db yet */ return; } log = talloc(peer, struct messaging_reclog); if (log == NULL) { DBG_DEBUG("talloc failed\n"); return; } ndr_err = ndr_pull_struct_blob_all( &blob, log, log, (ndr_pull_flags_fn_t)ndr_pull_messaging_reclog); if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) { DBG_DEBUG("ndr_pull_messaging_reclog failed: %s\n", ndr_errstr(ndr_err)); goto fail; } DBG_DEBUG("Got %"PRIu32" recs index %"PRIu64" from %s\n", log->num_recs, log->rec_index, server_id_str_buf(peer->pid, &idbuf)); if (log->rec_index != peer->rec_index) { DBG_INFO("Got rec index %"PRIu64" from %s, " "expected %"PRIu64"\n", log->rec_index, server_id_str_buf(peer->pid, &idbuf), peer->rec_index); goto fail; } for (i=0; inum_recs; i++) { struct messaging_rec *r = log->recs[i]; struct notify_rec_change_msg *chg; size_t pathlen; bool ok; struct notify_instance instance; ok = notifyd_parse_rec_change(r->buf.data, r->buf.length, &chg, &pathlen); if (!ok) { DBG_INFO("notifyd_parse_rec_change failed\n"); goto fail; } /* avoid SIGBUS */ memcpy(&instance, &chg->instance, sizeof(instance)); ok = notifyd_apply_rec_change(&r->src, chg->path, pathlen, &instance, peer->db, state->sys_notify_watch, state->sys_notify_ctx, state->msg_ctx); if (!ok) { DBG_INFO("notifyd_apply_rec_change failed\n"); goto fail; } } peer->rec_index += 1; peer->last_broadcast = time(NULL); TALLOC_FREE(log); return; fail: DBG_DEBUG("Dropping peer %s\n", server_id_str_buf(peer->pid, &idbuf)); TALLOC_FREE(peer); } /* * Receive messaging_reclog (log of MSG_SMB_NOTIFY_REC_CHANGE * messages) broadcasts by other notifyds. Several cases: * * We don't know the source. This creates a new peer. Creating a peer * involves asking the peer for its full database. We assume ordered * messages, so the new database will arrive before the next broadcast * will. * * We know the source and the log index matches. We will apply the log * locally to our peer's db as if we had received it from a local * client. * * We know the source but the log index does not match. This means we * lost a message. We just drop the whole peer and wait for the next * broadcast, which will then trigger a fresh database pull. */ static int notifyd_snoop_broadcast(struct tevent_context *ev, uint32_t src_vnn, uint32_t dst_vnn, uint64_t dst_srvid, const uint8_t *msg, size_t msglen, void *private_data) { struct notifyd_state *state = talloc_get_type_abort( private_data, struct notifyd_state); struct server_id my_id = messaging_server_id(state->msg_ctx); struct notifyd_peer *p; uint32_t i; uint32_t msg_type; struct server_id src, dst; struct server_id_buf idbuf; NTSTATUS status; if (msglen < MESSAGE_HDR_LENGTH) { DBG_DEBUG("Got short broadcast\n"); return 0; } message_hdr_get(&msg_type, &src, &dst, msg); if (msg_type != MSG_SMB_NOTIFY_REC_CHANGES) { DBG_DEBUG("Got message %"PRIu32", ignoring\n", msg_type); return 0; } if (server_id_equal(&src, &my_id)) { DBG_DEBUG("Ignoring my own broadcast\n"); return 0; } DBG_DEBUG("Got MSG_SMB_NOTIFY_REC_CHANGES from %s\n", server_id_str_buf(src, &idbuf)); for (i=0; inum_peers; i++) { if (server_id_equal(&state->peers[i]->pid, &src)) { DBG_DEBUG("Applying changes to peer %"PRIu32"\n", i); notifyd_apply_reclog(state->peers[i], msg + MESSAGE_HDR_LENGTH, msglen - MESSAGE_HDR_LENGTH); return 0; } } DBG_DEBUG("Creating new peer for %s\n", server_id_str_buf(src, &idbuf)); p = notifyd_peer_new(state, src); if (p == NULL) { DBG_DEBUG("notifyd_peer_new failed\n"); return 0; } status = messaging_send_buf(state->msg_ctx, src, MSG_SMB_NOTIFY_GET_DB, NULL, 0); if (!NT_STATUS_IS_OK(status)) { DBG_DEBUG("messaging_send_buf failed: %s\n", nt_errstr(status)); TALLOC_FREE(p); return 0; } return 0; } #endif