/* * Unix SMB/CIFS implementation. * Test async ctdb_req_send/recv * Copyright (C) Volker Lendecke 2020 * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #include "includes.h" #include "torture/proto.h" #include "ctdbd_conn.h" #include "lib/cluster_support.h" #include "ctdb/include/ctdb_protocol.h" #include "lib/util/tevent_unix.h" extern int torture_nprocs; extern int torture_numops; struct ctdb_echo_state { struct ctdb_req_control_old req; struct iovec iov[2]; TDB_DATA echodata; }; static void ctdb_echo_done(struct tevent_req *subreq); static struct tevent_req *ctdb_echo_send( TALLOC_CTX *mem_ctx, struct tevent_context *ev, struct ctdbd_connection *conn, uint32_t delay) { struct tevent_req *req = NULL, *subreq = NULL; struct ctdb_echo_state *state = NULL; struct ctdb_req_header *hdr = NULL; uint32_t datalen; req = tevent_req_create( mem_ctx, &state, struct ctdb_echo_state); if (req == NULL) { return NULL; } hdr = &state->req.hdr; ctdbd_prep_hdr_next_reqid(conn, hdr); hdr->operation = CTDB_REQ_CONTROL; state->req.opcode = CTDB_CONTROL_ECHO_DATA; state->iov[0] = (struct iovec) { .iov_base = &state->req, .iov_len = offsetof(struct ctdb_req_control_old, data), }; datalen = generate_random() % 1024; state->echodata.dptr = talloc_array(state, uint8_t, datalen+8); if (tevent_req_nomem(state->echodata.dptr, req)) { return tevent_req_post(req, ev); } state->echodata.dsize = talloc_get_size(state->echodata.dptr); generate_random_buffer( state->echodata.dptr, state->echodata.dsize); memcpy(state->echodata.dptr, &delay, sizeof(delay)); memcpy(state->echodata.dptr+4, &datalen, sizeof(datalen)); state->req.datalen = state->echodata.dsize; state->iov[1] = (struct iovec) { .iov_base = state->echodata.dptr, .iov_len = state->echodata.dsize, }; hdr->length = offsetof(struct ctdb_req_control_old, data) + state->req.datalen; subreq = ctdbd_req_send( state, ev, conn, state->iov, ARRAY_SIZE(state->iov)); if (tevent_req_nomem(subreq, req)) { return tevent_req_post(req, ev); } tevent_req_set_callback(subreq, ctdb_echo_done, req); return req; } static void ctdb_echo_done(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct ctdb_echo_state *state = tevent_req_data( req, struct ctdb_echo_state); struct ctdb_req_header *hdr = NULL; struct ctdb_reply_control_old *reply = NULL; int cmp, ret; ret = ctdbd_req_recv(subreq, state, &hdr); TALLOC_FREE(subreq); if (tevent_req_error(req, ret)) { printf("ctdbd_req_recv(%"PRIu32") returned %d (%s)\n", state->req.hdr.reqid, ret, strerror(ret)); return; } if (hdr->operation != CTDB_REPLY_CONTROL) { printf("Expected CTDB_REPLY_CONTROL, got %"PRIu32"\n", hdr->operation); tevent_req_error(req, EIO); return; } reply = (struct ctdb_reply_control_old *)hdr; if (reply->status != 0) { printf("reply->status = %"PRIi32"\n", reply->status); tevent_req_error(req, EIO); return; } if (reply->datalen != state->req.datalen) { printf("state->echodata.dsize=%zu datalen=%"PRIu32"\n", state->echodata.dsize, reply->datalen); tevent_req_error(req, EIO); return; } cmp = memcmp(reply->data, state->echodata.dptr, state->echodata.dsize); if (cmp != 0) { printf("data mismatch\n"); tevent_req_error(req, EIO); return; } TALLOC_FREE(reply); tevent_req_done(req); } static int ctdb_echo_recv(struct tevent_req *req) { return tevent_req_simple_recv_unix(req); } struct ctdb_ping_flood_state { struct tevent_context *ev; struct ctdbd_connection *conn; size_t num_running; bool done; }; static void ctdb_ping_flood_next(struct tevent_req *subreq); static void ctdb_ping_flood_done(struct tevent_req *subreq); static struct tevent_req *ctdb_ping_flood_send( TALLOC_CTX *mem_ctx, struct tevent_context *ev, struct ctdbd_connection *conn, size_t num_parallel, unsigned usecs) { struct tevent_req *req = NULL, *subreq = NULL; struct ctdb_ping_flood_state *state = NULL; size_t i; req = tevent_req_create( mem_ctx, &state, struct ctdb_ping_flood_state); if (req == NULL) { return NULL; } state->ev = ev; state->conn = conn; for (i=0; iev, state->conn, generate_random() % 10); if (tevent_req_nomem(subreq, req)) { return tevent_req_post(req, ev); } tevent_req_set_callback(subreq, ctdb_ping_flood_next, req); } state->num_running = num_parallel; subreq = tevent_wakeup_send( state, ev, tevent_timeval_current_ofs(0, usecs)); if (tevent_req_nomem(subreq, req)) { return tevent_req_post(req, ev); } tevent_req_set_callback(subreq, ctdb_ping_flood_done, req); return req; } static void ctdb_ping_flood_next(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct ctdb_ping_flood_state *state = tevent_req_data( req, struct ctdb_ping_flood_state); int ret; ret = ctdb_echo_recv(subreq); TALLOC_FREE(subreq); if (tevent_req_error(req, ret)) { return; } state->num_running -= 1; if (state->done) { if (state->num_running == 0) { tevent_req_done(req); } return; } subreq = ctdb_echo_send( state, state->ev, state->conn, generate_random() % 10); if (tevent_req_nomem(subreq, req)) { return; } tevent_req_set_callback(subreq, ctdb_ping_flood_next, req); state->num_running += 1; } static void ctdb_ping_flood_done(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct ctdb_ping_flood_state *state = tevent_req_data( req, struct ctdb_ping_flood_state); bool ok; ok = tevent_wakeup_recv(subreq); TALLOC_FREE(subreq); if (!ok) { tevent_req_oom(req); return; } state->done = true; } static int ctdb_ping_flood_recv(struct tevent_req *req) { return tevent_req_simple_recv_unix(req); } bool run_ctdbd_conn1(int dummy) { struct ctdbd_connection *conn = NULL; struct tevent_context *ev = NULL; struct tevent_req *req = NULL; int ret; bool ok; bool result = false; ev = samba_tevent_context_init(talloc_tos()); if (ev == NULL) { printf("samba_tevent_context_init failed\n"); goto done; } ret = ctdbd_init_async_connection( ev, lp_ctdbd_socket(), 0, &conn); if (ret != 0) { printf("ctdbd_init_async_connection failed: %s\n", strerror(ret)); goto done; } req = ctdb_ping_flood_send( ev, ev, conn, torture_nprocs, torture_numops * 1000); if (req == NULL) { printf("ctdb_ping_flood_send failed\n"); goto done; } ok = tevent_req_poll_unix(req, ev, &ret); if (!ok) { printf("tevent_req_poll_unix failed: %s\n", strerror(ret)); goto done; } ret = ctdb_ping_flood_recv(req); TALLOC_FREE(req); if (ret != 0) { printf("ctdb_ping_flood failed: %s\n", strerror(ret)); goto done; } result = true; done: TALLOC_FREE(conn); return result; }