diff options
Diffstat (limited to 'src/rgw/rgw_lua_background.cc')
-rw-r--r-- | src/rgw/rgw_lua_background.cc | 181 |
1 files changed, 181 insertions, 0 deletions
diff --git a/src/rgw/rgw_lua_background.cc b/src/rgw/rgw_lua_background.cc new file mode 100644 index 000000000..35de4a7e9 --- /dev/null +++ b/src/rgw/rgw_lua_background.cc @@ -0,0 +1,181 @@ +#include "rgw_sal_rados.h" +#include "rgw_lua_background.h" +#include "rgw_lua.h" +#include "rgw_lua_utils.h" +#include "rgw_perf_counters.h" +#include "include/ceph_assert.h" +#include <lua.hpp> + +#define dout_subsys ceph_subsys_rgw + +namespace rgw::lua { + +const char* RGWTable::INCREMENT = "increment"; +const char* RGWTable::DECREMENT = "decrement"; + +int RGWTable::increment_by(lua_State* L) { + const auto map = reinterpret_cast<BackgroundMap*>(lua_touserdata(L, lua_upvalueindex(FIRST_UPVAL))); + auto& mtx = *reinterpret_cast<std::mutex*>(lua_touserdata(L, lua_upvalueindex(SECOND_UPVAL))); + auto decrement = lua_toboolean(L, lua_upvalueindex(THIRD_UPVAL)); + + const auto args = lua_gettop(L); + const auto index = luaL_checkstring(L, 1); + + // by default we increment by 1/-1 + const long long int default_inc = (decrement ? -1 : 1); + BackgroundMapValue inc_by = default_inc; + if (args == 2) { + if (lua_isinteger(L, 2)) { + inc_by = lua_tointeger(L, 2)*default_inc; + } else if (lua_isnumber(L, 2)){ + inc_by = lua_tonumber(L, 2)*static_cast<double>(default_inc); + } else { + return luaL_error(L, "can increment only by numeric values"); + } + } + + std::unique_lock l(mtx); + + const auto it = map->find(std::string(index)); + if (it != map->end()) { + auto& value = it->second; + if (std::holds_alternative<double>(value) && std::holds_alternative<double>(inc_by)) { + value = std::get<double>(value) + std::get<double>(inc_by); + } else if (std::holds_alternative<long long int>(value) && std::holds_alternative<long long int>(inc_by)) { + value = std::get<long long int>(value) + std::get<long long int>(inc_by); + } else if (std::holds_alternative<double>(value) && std::holds_alternative<long long int>(inc_by)) { + value = std::get<double>(value) + static_cast<double>(std::get<long long int>(inc_by)); + } else if (std::holds_alternative<long long int>(value) && std::holds_alternative<double>(inc_by)) { + value = static_cast<double>(std::get<long long int>(value)) + std::get<double>(inc_by); + } else { + mtx.unlock(); + return luaL_error(L, "can increment only numeric values"); + } + } + + return 0; +} + +Background::Background(rgw::sal::Driver* driver, + CephContext* cct, + const std::string& luarocks_path, + int execute_interval) : + execute_interval(execute_interval), + dp(cct, dout_subsys, "lua background: "), + lua_manager(driver->get_lua_manager()), + cct(cct), + luarocks_path(luarocks_path) {} + +void Background::shutdown(){ + stopped = true; + cond.notify_all(); + if (runner.joinable()) { + runner.join(); + } + started = false; + stopped = false; +} + +void Background::start() { + if (started) { + // start the thread only once + return; + } + started = true; + runner = std::thread(&Background::run, this); + const auto rc = ceph_pthread_setname(runner.native_handle(), + "lua_background"); + ceph_assert(rc == 0); +} + +void Background::pause() { + { + std::unique_lock cond_lock(pause_mutex); + paused = true; + } + cond.notify_all(); +} + +void Background::resume(rgw::sal::Driver* driver) { + lua_manager = driver->get_lua_manager(); + paused = false; + cond.notify_all(); +} + +int Background::read_script() { + std::unique_lock cond_lock(pause_mutex); + if (paused) { + return -EAGAIN; + } + std::string tenant; + return rgw::lua::read_script(&dp, lua_manager.get(), tenant, null_yield, rgw::lua::context::background, rgw_script); +} + +const BackgroundMapValue Background::empty_table_value; + +const BackgroundMapValue& Background::get_table_value(const std::string& key) const { + std::unique_lock cond_lock(table_mutex); + const auto it = rgw_map.find(key); + if (it == rgw_map.end()) { + return empty_table_value; + } + return it->second; +} + +//(1) Loads the script from the object if not paused +//(2) Executes the script +//(3) Sleep (configurable) +void Background::run() { + lua_State* const L = luaL_newstate(); + rgw::lua::lua_state_guard lguard(L); + open_standard_libs(L); + set_package_path(L, luarocks_path); + create_debug_action(L, cct); + create_background_metatable(L); + const DoutPrefixProvider* const dpp = &dp; + + while (!stopped) { + if (paused) { + ldpp_dout(dpp, 10) << "Lua background thread paused" << dendl; + std::unique_lock cond_lock(cond_mutex); + cond.wait(cond_lock, [this]{return !paused || stopped;}); + if (stopped) { + ldpp_dout(dpp, 10) << "Lua background thread stopped" << dendl; + return; + } + ldpp_dout(dpp, 10) << "Lua background thread resumed" << dendl; + } + const auto rc = read_script(); + if (rc == -ENOENT || rc == -EAGAIN) { + // either no script or paused, nothing to do + } else if (rc < 0) { + ldpp_dout(dpp, 1) << "WARNING: failed to read background script. error " << rc << dendl; + } else { + auto failed = false; + try { + //execute the background lua script + if (luaL_dostring(L, rgw_script.c_str()) != LUA_OK) { + const std::string err(lua_tostring(L, -1)); + ldpp_dout(dpp, 1) << "Lua ERROR: " << err << dendl; + failed = true; + } + } catch (const std::exception& e) { + ldpp_dout(dpp, 1) << "Lua ERROR: " << e.what() << dendl; + failed = true; + } + if (perfcounter) { + perfcounter->inc((failed ? l_rgw_lua_script_fail : l_rgw_lua_script_ok), 1); + } + } + std::unique_lock cond_lock(cond_mutex); + cond.wait_for(cond_lock, std::chrono::seconds(execute_interval), [this]{return stopped;}); + } + ldpp_dout(dpp, 10) << "Lua background thread stopped" << dendl; +} + +void Background::create_background_metatable(lua_State* L) { + create_metatable<rgw::lua::RGWTable>(L, true, &rgw_map, &table_mutex); +} + +} //namespace rgw::lua + |