/* Copyright (c) 2019-2020, Codership Oy. All rights reserved.
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; version 2 of the License.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
 */

#include "trx.h"
#include "log.h"

#include <assert.h>
#include <errno.h>  // ENOMEM, etc.
#include <stdbool.h>

wsrep_status_t
node_trx_execute(node_store_t*   const store,
                 wsrep_t*        const wsrep,
                 wsrep_conn_id_t const conn_id,
                 int                   ops_num)
{
    wsrep_status_t cert = WSREP_OK; // for cleanup

    static unsigned int const ws_flags =
        WSREP_FLAG_TRX_START | WSREP_FLAG_TRX_END; // atomic trx
    wsrep_trx_meta_t ws_meta;
    wsrep_status_t ret = WSREP_OK;

    /* prepare simple transaction and obtain a writeset handle for it */
    wsrep_ws_handle_t ws_handle = { 0, NULL };
    while (ops_num--)
    {
        if (0 != (ret = node_store_execute(store, wsrep, &ws_handle)))
        {
#if 0
            NODE_INFO("master [%d]: node_store_execute() returned %d",
                      conn_id, ret);
#endif
            ret = WSREP_TRX_FAIL;
            goto cleanup;
        }
    }

    /* REPLICATION: (replicate and) certify the writeset (pointed to by
     *              ws_handle) with the cluster */
    cert = wsrep->certify(wsrep, conn_id, &ws_handle, ws_flags, &ws_meta);

    if (WSREP_BF_ABORT == cert)
    {
        /* REPLICATION: transaction was signaled to abort due to multi-master
         *              conflict. It must rollback immediately: it blocks
         *              transaction that was ordered earlier and will never
         *              be able to enter commit order. */
        node_store_rollback(store, ws_handle.trx_id);
    }

    /* REPLICATION: writeset was totally ordered, need to enter commit order */
    if (ws_meta.gtid.seqno > 0)
    {
        ret = wsrep->commit_order_enter(wsrep, &ws_handle, &ws_meta);
        if (ret)
        {
            NODE_ERROR("master [%d]: wsrep::commit_order_enter(%lld) failed: "
                       "%d", (long long)(ws_meta.gtid.seqno), ret);
            goto cleanup;
        }

        /* REPLICATION: inside commit monitor
         * Note: we commit transaction only if certification succeded */
        if (WSREP_OK == cert)
            node_store_commit(store, ws_handle.trx_id, &ws_meta.gtid);
        else
            node_store_update_gtid(store, &ws_meta.gtid);

        ret = wsrep->commit_order_leave(wsrep, &ws_handle, &ws_meta, NULL);
        if (ret)
        {
            NODE_ERROR("master [%d]: wsrep::commit_order_leave(%lld) failed: "
                       "%d", (long long)(ws_meta.gtid.seqno), ret);
            goto cleanup;
        }
    }
    else
    {
        assert(cert);
    }

cleanup:
    /* REPLICATION: if wsrep->certify() returned anything else but WSREP_OK
     *              transaction must roll back. BF aborted trx already did it. */
    if (cert && WSREP_BF_ABORT != cert)
        node_store_rollback(store, ws_handle.trx_id);

    /* NOTE: this application follows the approach that resources must be freed
     *       at the same level where they were allocated, so it is assumed that
     *       ws_key and ws were deallocated in either commit or rollback calls.*/

    /* REPLICATION: release provider resources associated with the trx */
    wsrep->release(wsrep, &ws_handle);

    return ret ? ret : cert;
}

wsrep_status_t
node_trx_apply(node_store_t*            const store,
               wsrep_t*                 const wsrep,
               const wsrep_ws_handle_t* const ws_handle,
               const wsrep_trx_meta_t*  const ws_meta,
               const wsrep_buf_t*       const ws)
{
    /* no business being here if event was not ordered */
    assert(ws_meta->gtid.seqno > 0);

    wsrep_trx_id_t trx_id;
    wsrep_buf_t err_buf = { NULL, 0 };
    int app_err;
    if (ws)
    {
        app_err = node_store_apply(store, &trx_id, ws);
        if (app_err)
        {
            /* REPLICATION: if applying failed, prepare an error buffer with
             *              sufficient error specification */
            err_buf.ptr = &app_err; // suppose error code is enough
            err_buf.len = sizeof(app_err);
        }
    }
    else /* ws failed certification and should be skipped */
    {
        /* just some non-0 code to choose node_store_update_gtid() below */
        app_err = 1;
    }

    wsrep_status_t ret;
    ret = wsrep->commit_order_enter(wsrep, ws_handle, ws_meta);
    if (ret) {
        node_store_rollback(store, trx_id);
        return ret;
    }

    if (!app_err) node_store_commit(store, trx_id, &ws_meta->gtid);
    else          node_store_update_gtid(store, &ws_meta->gtid);

    ret = wsrep->commit_order_leave(wsrep, ws_handle, ws_meta, &err_buf);

    return ret;
}