summaryrefslogtreecommitdiffstats
path: root/wsrep-lib/wsrep-API/v26/examples/node/trx.c
blob: afdcada492660841e059c29a343866cf8b65a299 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
/* Copyright (c) 2019-2020, Codership Oy. All rights reserved.
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; version 2 of the License.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
 */

#include "trx.h"
#include "log.h"

#include <assert.h>
#include <errno.h>  // ENOMEM, etc.
#include <stdbool.h>

wsrep_status_t
node_trx_execute(node_store_t*   const store,
                 wsrep_t*        const wsrep,
                 wsrep_conn_id_t const conn_id,
                 int                   ops_num)
{
    wsrep_status_t cert = WSREP_OK; // for cleanup

    static unsigned int const ws_flags =
        WSREP_FLAG_TRX_START | WSREP_FLAG_TRX_END; // atomic trx
    wsrep_trx_meta_t ws_meta;
    wsrep_status_t ret = WSREP_OK;

    /* prepare simple transaction and obtain a writeset handle for it */
    wsrep_ws_handle_t ws_handle = { 0, NULL };
    while (ops_num--)
    {
        if (0 != (ret = node_store_execute(store, wsrep, &ws_handle)))
        {
#if 0
            NODE_INFO("master [%d]: node_store_execute() returned %d",
                      conn_id, ret);
#endif
            ret = WSREP_TRX_FAIL;
            goto cleanup;
        }
    }

    /* REPLICATION: (replicate and) certify the writeset (pointed to by
     *              ws_handle) with the cluster */
    cert = wsrep->certify(wsrep, conn_id, &ws_handle, ws_flags, &ws_meta);

    if (WSREP_BF_ABORT == cert)
    {
        /* REPLICATION: transaction was signaled to abort due to multi-master
         *              conflict. It must rollback immediately: it blocks
         *              transaction that was ordered earlier and will never
         *              be able to enter commit order. */
        node_store_rollback(store, ws_handle.trx_id);
    }

    /* REPLICATION: writeset was totally ordered, need to enter commit order */
    if (ws_meta.gtid.seqno > 0)
    {
        ret = wsrep->commit_order_enter(wsrep, &ws_handle, &ws_meta);
        if (ret)
        {
            NODE_ERROR("master [%d]: wsrep::commit_order_enter(%lld) failed: "
                       "%d", (long long)(ws_meta.gtid.seqno), ret);
            goto cleanup;
        }

        /* REPLICATION: inside commit monitor
         * Note: we commit transaction only if certification succeded */
        if (WSREP_OK == cert)
            node_store_commit(store, ws_handle.trx_id, &ws_meta.gtid);
        else
            node_store_update_gtid(store, &ws_meta.gtid);

        ret = wsrep->commit_order_leave(wsrep, &ws_handle, &ws_meta, NULL);
        if (ret)
        {
            NODE_ERROR("master [%d]: wsrep::commit_order_leave(%lld) failed: "
                       "%d", (long long)(ws_meta.gtid.seqno), ret);
            goto cleanup;
        }
    }
    else
    {
        assert(cert);
    }

cleanup:
    /* REPLICATION: if wsrep->certify() returned anything else but WSREP_OK
     *              transaction must roll back. BF aborted trx already did it. */
    if (cert && WSREP_BF_ABORT != cert)
        node_store_rollback(store, ws_handle.trx_id);

    /* NOTE: this application follows the approach that resources must be freed
     *       at the same level where they were allocated, so it is assumed that
     *       ws_key and ws were deallocated in either commit or rollback calls.*/

    /* REPLICATION: release provider resources associated with the trx */
    wsrep->release(wsrep, &ws_handle);

    return ret ? ret : cert;
}

wsrep_status_t
node_trx_apply(node_store_t*            const store,
               wsrep_t*                 const wsrep,
               const wsrep_ws_handle_t* const ws_handle,
               const wsrep_trx_meta_t*  const ws_meta,
               const wsrep_buf_t*       const ws)
{
    /* no business being here if event was not ordered */
    assert(ws_meta->gtid.seqno > 0);

    wsrep_trx_id_t trx_id;
    wsrep_buf_t err_buf = { NULL, 0 };
    int app_err;
    if (ws)
    {
        app_err = node_store_apply(store, &trx_id, ws);
        if (app_err)
        {
            /* REPLICATION: if applying failed, prepare an error buffer with
             *              sufficient error specification */
            err_buf.ptr = &app_err; // suppose error code is enough
            err_buf.len = sizeof(app_err);
        }
    }
    else /* ws failed certification and should be skipped */
    {
        /* just some non-0 code to choose node_store_update_gtid() below */
        app_err = 1;
    }

    wsrep_status_t ret;
    ret = wsrep->commit_order_enter(wsrep, ws_handle, ws_meta);
    if (ret) {
        node_store_rollback(store, trx_id);
        return ret;
    }

    if (!app_err) node_store_commit(store, trx_id, &ws_meta->gtid);
    else          node_store_update_gtid(store, &ws_meta->gtid);

    ret = wsrep->commit_order_leave(wsrep, ws_handle, ws_meta, &err_buf);

    return ret;
}