From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- src/rocksdb/java/rocksjni/comparatorjnicallback.cc | 646 +++++++++++++++++++++ 1 file changed, 646 insertions(+) create mode 100644 src/rocksdb/java/rocksjni/comparatorjnicallback.cc (limited to 'src/rocksdb/java/rocksjni/comparatorjnicallback.cc') diff --git a/src/rocksdb/java/rocksjni/comparatorjnicallback.cc b/src/rocksdb/java/rocksjni/comparatorjnicallback.cc new file mode 100644 index 000000000..07ab9fa41 --- /dev/null +++ b/src/rocksdb/java/rocksjni/comparatorjnicallback.cc @@ -0,0 +1,646 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// This file implements the callback "bridge" between Java and C++ for +// ROCKSDB_NAMESPACE::Comparator. + +#include "rocksjni/comparatorjnicallback.h" + +#include "rocksjni/portal.h" + +namespace ROCKSDB_NAMESPACE { +ComparatorJniCallback::ComparatorJniCallback( + JNIEnv* env, jobject jcomparator, + const ComparatorJniCallbackOptions* options) + : JniCallback(env, jcomparator), m_options(options) { + // cache the AbstractComparatorJniBridge class as we will reuse it many times + // for each callback + m_abstract_comparator_jni_bridge_clazz = static_cast( + env->NewGlobalRef(AbstractComparatorJniBridge::getJClass(env))); + + // Note: The name of a Comparator will not change during it's lifetime, + // so we cache it in a global var + jmethodID jname_mid = AbstractComparatorJni::getNameMethodId(env); + if (jname_mid == nullptr) { + // exception thrown: NoSuchMethodException or OutOfMemoryError + return; + } + jstring js_name = (jstring)env->CallObjectMethod(m_jcallback_obj, jname_mid); + if (env->ExceptionCheck()) { + // exception thrown + return; + } + jboolean has_exception = JNI_FALSE; + m_name = JniUtil::copyString(env, js_name, + &has_exception); // also releases jsName + if (has_exception == JNI_TRUE) { + // exception thrown + return; + } + + // cache the ByteBuffer class as we will reuse it many times for each callback + m_jbytebuffer_clazz = + static_cast(env->NewGlobalRef(ByteBufferJni::getJClass(env))); + + m_jcompare_mid = AbstractComparatorJniBridge::getCompareInternalMethodId( + env, m_abstract_comparator_jni_bridge_clazz); + if (m_jcompare_mid == nullptr) { + // exception thrown: NoSuchMethodException or OutOfMemoryError + return; + } + + m_jshortest_mid = + AbstractComparatorJniBridge::getFindShortestSeparatorInternalMethodId( + env, m_abstract_comparator_jni_bridge_clazz); + if (m_jshortest_mid == nullptr) { + // exception thrown: NoSuchMethodException or OutOfMemoryError + return; + } + + m_jshort_mid = + AbstractComparatorJniBridge::getFindShortSuccessorInternalMethodId( + env, m_abstract_comparator_jni_bridge_clazz); + if (m_jshort_mid == nullptr) { + // exception thrown: NoSuchMethodException or OutOfMemoryError + return; + } + + // do we need reusable buffers? + if (m_options->max_reused_buffer_size > -1) { + if (m_options->reused_synchronisation_type == + ReusedSynchronisationType::THREAD_LOCAL) { + // buffers reused per thread + UnrefHandler unref = [](void* ptr) { + ThreadLocalBuf* tlb = reinterpret_cast(ptr); + jboolean attached_thread = JNI_FALSE; + JNIEnv* _env = JniUtil::getJniEnv(tlb->jvm, &attached_thread); + if (_env != nullptr) { + if (tlb->direct_buffer) { + void* buf = _env->GetDirectBufferAddress(tlb->jbuf); + delete[] static_cast(buf); + } + _env->DeleteGlobalRef(tlb->jbuf); + JniUtil::releaseJniEnv(tlb->jvm, attached_thread); + } + }; + + m_tl_buf_a = new ThreadLocalPtr(unref); + m_tl_buf_b = new ThreadLocalPtr(unref); + + m_jcompare_buf_a = nullptr; + m_jcompare_buf_b = nullptr; + m_jshortest_buf_start = nullptr; + m_jshortest_buf_limit = nullptr; + m_jshort_buf_key = nullptr; + + } else { + // buffers reused and shared across threads + const bool adaptive = m_options->reused_synchronisation_type == + ReusedSynchronisationType::ADAPTIVE_MUTEX; + mtx_compare = std::unique_ptr(new port::Mutex(adaptive)); + mtx_shortest = std::unique_ptr(new port::Mutex(adaptive)); + mtx_short = std::unique_ptr(new port::Mutex(adaptive)); + + m_jcompare_buf_a = env->NewGlobalRef(ByteBufferJni::construct( + env, m_options->direct_buffer, m_options->max_reused_buffer_size, + m_jbytebuffer_clazz)); + if (m_jcompare_buf_a == nullptr) { + // exception thrown: OutOfMemoryError + return; + } + + m_jcompare_buf_b = env->NewGlobalRef(ByteBufferJni::construct( + env, m_options->direct_buffer, m_options->max_reused_buffer_size, + m_jbytebuffer_clazz)); + if (m_jcompare_buf_b == nullptr) { + // exception thrown: OutOfMemoryError + return; + } + + m_jshortest_buf_start = env->NewGlobalRef(ByteBufferJni::construct( + env, m_options->direct_buffer, m_options->max_reused_buffer_size, + m_jbytebuffer_clazz)); + if (m_jshortest_buf_start == nullptr) { + // exception thrown: OutOfMemoryError + return; + } + + m_jshortest_buf_limit = env->NewGlobalRef(ByteBufferJni::construct( + env, m_options->direct_buffer, m_options->max_reused_buffer_size, + m_jbytebuffer_clazz)); + if (m_jshortest_buf_limit == nullptr) { + // exception thrown: OutOfMemoryError + return; + } + + m_jshort_buf_key = env->NewGlobalRef(ByteBufferJni::construct( + env, m_options->direct_buffer, m_options->max_reused_buffer_size, + m_jbytebuffer_clazz)); + if (m_jshort_buf_key == nullptr) { + // exception thrown: OutOfMemoryError + return; + } + + m_tl_buf_a = nullptr; + m_tl_buf_b = nullptr; + } + + } else { + m_jcompare_buf_a = nullptr; + m_jcompare_buf_b = nullptr; + m_jshortest_buf_start = nullptr; + m_jshortest_buf_limit = nullptr; + m_jshort_buf_key = nullptr; + + m_tl_buf_a = nullptr; + m_tl_buf_b = nullptr; + } +} + +ComparatorJniCallback::~ComparatorJniCallback() { + jboolean attached_thread = JNI_FALSE; + JNIEnv* env = getJniEnv(&attached_thread); + assert(env != nullptr); + + env->DeleteGlobalRef(m_abstract_comparator_jni_bridge_clazz); + + env->DeleteGlobalRef(m_jbytebuffer_clazz); + + if (m_jcompare_buf_a != nullptr) { + if (m_options->direct_buffer) { + void* buf = env->GetDirectBufferAddress(m_jcompare_buf_a); + delete[] static_cast(buf); + } + env->DeleteGlobalRef(m_jcompare_buf_a); + } + + if (m_jcompare_buf_b != nullptr) { + if (m_options->direct_buffer) { + void* buf = env->GetDirectBufferAddress(m_jcompare_buf_b); + delete[] static_cast(buf); + } + env->DeleteGlobalRef(m_jcompare_buf_b); + } + + if (m_jshortest_buf_start != nullptr) { + if (m_options->direct_buffer) { + void* buf = env->GetDirectBufferAddress(m_jshortest_buf_start); + delete[] static_cast(buf); + } + env->DeleteGlobalRef(m_jshortest_buf_start); + } + + if (m_jshortest_buf_limit != nullptr) { + if (m_options->direct_buffer) { + void* buf = env->GetDirectBufferAddress(m_jshortest_buf_limit); + delete[] static_cast(buf); + } + env->DeleteGlobalRef(m_jshortest_buf_limit); + } + + if (m_jshort_buf_key != nullptr) { + if (m_options->direct_buffer) { + void* buf = env->GetDirectBufferAddress(m_jshort_buf_key); + delete[] static_cast(buf); + } + env->DeleteGlobalRef(m_jshort_buf_key); + } + + if (m_tl_buf_a != nullptr) { + delete m_tl_buf_a; + } + + if (m_tl_buf_b != nullptr) { + delete m_tl_buf_b; + } + + releaseJniEnv(attached_thread); +} + +const char* ComparatorJniCallback::Name() const { return m_name.get(); } + +int ComparatorJniCallback::Compare(const Slice& a, const Slice& b) const { + jboolean attached_thread = JNI_FALSE; + JNIEnv* env = getJniEnv(&attached_thread); + assert(env != nullptr); + + const bool reuse_jbuf_a = + static_cast(a.size()) <= m_options->max_reused_buffer_size; + const bool reuse_jbuf_b = + static_cast(b.size()) <= m_options->max_reused_buffer_size; + + MaybeLockForReuse(mtx_compare, reuse_jbuf_a || reuse_jbuf_b); + + jobject jcompare_buf_a = + GetBuffer(env, a, reuse_jbuf_a, m_tl_buf_a, m_jcompare_buf_a); + if (jcompare_buf_a == nullptr) { + // exception occurred + MaybeUnlockForReuse(mtx_compare, reuse_jbuf_a || reuse_jbuf_b); + env->ExceptionDescribe(); // print out exception to stderr + releaseJniEnv(attached_thread); + return 0; + } + + jobject jcompare_buf_b = + GetBuffer(env, b, reuse_jbuf_b, m_tl_buf_b, m_jcompare_buf_b); + if (jcompare_buf_b == nullptr) { + // exception occurred + if (!reuse_jbuf_a) { + DeleteBuffer(env, jcompare_buf_a); + } + MaybeUnlockForReuse(mtx_compare, reuse_jbuf_a || reuse_jbuf_b); + env->ExceptionDescribe(); // print out exception to stderr + releaseJniEnv(attached_thread); + return 0; + } + + jint result = env->CallStaticIntMethod( + m_abstract_comparator_jni_bridge_clazz, m_jcompare_mid, m_jcallback_obj, + jcompare_buf_a, reuse_jbuf_a ? a.size() : -1, jcompare_buf_b, + reuse_jbuf_b ? b.size() : -1); + + if (env->ExceptionCheck()) { + // exception thrown from CallIntMethod + env->ExceptionDescribe(); // print out exception to stderr + result = 0; // we could not get a result from java callback so use 0 + } + + if (!reuse_jbuf_a) { + DeleteBuffer(env, jcompare_buf_a); + } + if (!reuse_jbuf_b) { + DeleteBuffer(env, jcompare_buf_b); + } + + MaybeUnlockForReuse(mtx_compare, reuse_jbuf_a || reuse_jbuf_b); + + releaseJniEnv(attached_thread); + + return result; +} + +void ComparatorJniCallback::FindShortestSeparator(std::string* start, + const Slice& limit) const { + if (start == nullptr) { + return; + } + + jboolean attached_thread = JNI_FALSE; + JNIEnv* env = getJniEnv(&attached_thread); + assert(env != nullptr); + + const bool reuse_jbuf_start = static_cast(start->length()) <= + m_options->max_reused_buffer_size; + const bool reuse_jbuf_limit = + static_cast(limit.size()) <= m_options->max_reused_buffer_size; + + MaybeLockForReuse(mtx_shortest, reuse_jbuf_start || reuse_jbuf_limit); + + Slice sstart(start->data(), start->length()); + jobject j_start_buf = GetBuffer(env, sstart, reuse_jbuf_start, m_tl_buf_a, + m_jshortest_buf_start); + if (j_start_buf == nullptr) { + // exception occurred + MaybeUnlockForReuse(mtx_shortest, reuse_jbuf_start || reuse_jbuf_limit); + env->ExceptionDescribe(); // print out exception to stderr + releaseJniEnv(attached_thread); + return; + } + + jobject j_limit_buf = GetBuffer(env, limit, reuse_jbuf_limit, m_tl_buf_b, + m_jshortest_buf_limit); + if (j_limit_buf == nullptr) { + // exception occurred + if (!reuse_jbuf_start) { + DeleteBuffer(env, j_start_buf); + } + MaybeUnlockForReuse(mtx_shortest, reuse_jbuf_start || reuse_jbuf_limit); + env->ExceptionDescribe(); // print out exception to stderr + releaseJniEnv(attached_thread); + return; + } + + jint jstart_len = env->CallStaticIntMethod( + m_abstract_comparator_jni_bridge_clazz, m_jshortest_mid, m_jcallback_obj, + j_start_buf, reuse_jbuf_start ? start->length() : -1, j_limit_buf, + reuse_jbuf_limit ? limit.size() : -1); + + if (env->ExceptionCheck()) { + // exception thrown from CallIntMethod + env->ExceptionDescribe(); // print out exception to stderr + + } else if (static_cast(jstart_len) != start->length()) { + // start buffer has changed in Java, so update `start` with the result + bool copy_from_non_direct = false; + if (reuse_jbuf_start) { + // reused a buffer + if (m_options->direct_buffer) { + // reused direct buffer + void* start_buf = env->GetDirectBufferAddress(j_start_buf); + if (start_buf == nullptr) { + if (!reuse_jbuf_start) { + DeleteBuffer(env, j_start_buf); + } + if (!reuse_jbuf_limit) { + DeleteBuffer(env, j_limit_buf); + } + MaybeUnlockForReuse(mtx_shortest, + reuse_jbuf_start || reuse_jbuf_limit); + ROCKSDB_NAMESPACE::RocksDBExceptionJni::ThrowNew( + env, "Unable to get Direct Buffer Address"); + env->ExceptionDescribe(); // print out exception to stderr + releaseJniEnv(attached_thread); + return; + } + start->assign(static_cast(start_buf), jstart_len); + + } else { + // reused non-direct buffer + copy_from_non_direct = true; + } + } else { + // there was a new buffer + if (m_options->direct_buffer) { + // it was direct... don't forget to potentially truncate the `start` + // string + start->resize(jstart_len); + } else { + // it was non-direct + copy_from_non_direct = true; + } + } + + if (copy_from_non_direct) { + jbyteArray jarray = + ByteBufferJni::array(env, j_start_buf, m_jbytebuffer_clazz); + if (jarray == nullptr) { + if (!reuse_jbuf_start) { + DeleteBuffer(env, j_start_buf); + } + if (!reuse_jbuf_limit) { + DeleteBuffer(env, j_limit_buf); + } + MaybeUnlockForReuse(mtx_shortest, reuse_jbuf_start || reuse_jbuf_limit); + env->ExceptionDescribe(); // print out exception to stderr + releaseJniEnv(attached_thread); + return; + } + jboolean has_exception = JNI_FALSE; + JniUtil::byteString( + env, jarray, + [start, jstart_len](const char* data, const size_t) { + return start->assign(data, static_cast(jstart_len)); + }, + &has_exception); + env->DeleteLocalRef(jarray); + if (has_exception == JNI_TRUE) { + if (!reuse_jbuf_start) { + DeleteBuffer(env, j_start_buf); + } + if (!reuse_jbuf_limit) { + DeleteBuffer(env, j_limit_buf); + } + env->ExceptionDescribe(); // print out exception to stderr + MaybeUnlockForReuse(mtx_shortest, reuse_jbuf_start || reuse_jbuf_limit); + releaseJniEnv(attached_thread); + return; + } + } + } + + if (!reuse_jbuf_start) { + DeleteBuffer(env, j_start_buf); + } + if (!reuse_jbuf_limit) { + DeleteBuffer(env, j_limit_buf); + } + + MaybeUnlockForReuse(mtx_shortest, reuse_jbuf_start || reuse_jbuf_limit); + + releaseJniEnv(attached_thread); +} + +void ComparatorJniCallback::FindShortSuccessor(std::string* key) const { + if (key == nullptr) { + return; + } + + jboolean attached_thread = JNI_FALSE; + JNIEnv* env = getJniEnv(&attached_thread); + assert(env != nullptr); + + const bool reuse_jbuf_key = + static_cast(key->length()) <= m_options->max_reused_buffer_size; + + MaybeLockForReuse(mtx_short, reuse_jbuf_key); + + Slice skey(key->data(), key->length()); + jobject j_key_buf = + GetBuffer(env, skey, reuse_jbuf_key, m_tl_buf_a, m_jshort_buf_key); + if (j_key_buf == nullptr) { + // exception occurred + MaybeUnlockForReuse(mtx_short, reuse_jbuf_key); + env->ExceptionDescribe(); // print out exception to stderr + releaseJniEnv(attached_thread); + return; + } + + jint jkey_len = env->CallStaticIntMethod( + m_abstract_comparator_jni_bridge_clazz, m_jshort_mid, m_jcallback_obj, + j_key_buf, reuse_jbuf_key ? key->length() : -1); + + if (env->ExceptionCheck()) { + // exception thrown from CallObjectMethod + if (!reuse_jbuf_key) { + DeleteBuffer(env, j_key_buf); + } + MaybeUnlockForReuse(mtx_short, reuse_jbuf_key); + env->ExceptionDescribe(); // print out exception to stderr + releaseJniEnv(attached_thread); + return; + } + + if (static_cast(jkey_len) != key->length()) { + // key buffer has changed in Java, so update `key` with the result + bool copy_from_non_direct = false; + if (reuse_jbuf_key) { + // reused a buffer + if (m_options->direct_buffer) { + // reused direct buffer + void* key_buf = env->GetDirectBufferAddress(j_key_buf); + if (key_buf == nullptr) { + ROCKSDB_NAMESPACE::RocksDBExceptionJni::ThrowNew( + env, "Unable to get Direct Buffer Address"); + if (!reuse_jbuf_key) { + DeleteBuffer(env, j_key_buf); + } + MaybeUnlockForReuse(mtx_short, reuse_jbuf_key); + env->ExceptionDescribe(); // print out exception to stderr + releaseJniEnv(attached_thread); + return; + } + key->assign(static_cast(key_buf), jkey_len); + } else { + // reused non-direct buffer + copy_from_non_direct = true; + } + } else { + // there was a new buffer + if (m_options->direct_buffer) { + // it was direct... don't forget to potentially truncate the `key` + // string + key->resize(jkey_len); + } else { + // it was non-direct + copy_from_non_direct = true; + } + } + + if (copy_from_non_direct) { + jbyteArray jarray = + ByteBufferJni::array(env, j_key_buf, m_jbytebuffer_clazz); + if (jarray == nullptr) { + if (!reuse_jbuf_key) { + DeleteBuffer(env, j_key_buf); + } + MaybeUnlockForReuse(mtx_short, reuse_jbuf_key); + env->ExceptionDescribe(); // print out exception to stderr + releaseJniEnv(attached_thread); + return; + } + jboolean has_exception = JNI_FALSE; + JniUtil::byteString( + env, jarray, + [key, jkey_len](const char* data, const size_t) { + return key->assign(data, static_cast(jkey_len)); + }, + &has_exception); + env->DeleteLocalRef(jarray); + if (has_exception == JNI_TRUE) { + if (!reuse_jbuf_key) { + DeleteBuffer(env, j_key_buf); + } + MaybeUnlockForReuse(mtx_short, reuse_jbuf_key); + env->ExceptionDescribe(); // print out exception to stderr + releaseJniEnv(attached_thread); + return; + } + } + } + + if (!reuse_jbuf_key) { + DeleteBuffer(env, j_key_buf); + } + + MaybeUnlockForReuse(mtx_short, reuse_jbuf_key); + + releaseJniEnv(attached_thread); +} + +inline void ComparatorJniCallback::MaybeLockForReuse( + const std::unique_ptr& mutex, const bool cond) const { + // no need to lock if using thread_local + if (m_options->reused_synchronisation_type != + ReusedSynchronisationType::THREAD_LOCAL && + cond) { + mutex.get()->Lock(); + } +} + +inline void ComparatorJniCallback::MaybeUnlockForReuse( + const std::unique_ptr& mutex, const bool cond) const { + // no need to unlock if using thread_local + if (m_options->reused_synchronisation_type != + ReusedSynchronisationType::THREAD_LOCAL && + cond) { + mutex.get()->Unlock(); + } +} + +jobject ComparatorJniCallback::GetBuffer(JNIEnv* env, const Slice& src, + bool reuse_buffer, + ThreadLocalPtr* tl_buf, + jobject jreuse_buffer) const { + if (reuse_buffer) { + if (m_options->reused_synchronisation_type == + ReusedSynchronisationType::THREAD_LOCAL) { + // reuse thread-local bufffer + ThreadLocalBuf* tlb = reinterpret_cast(tl_buf->Get()); + if (tlb == nullptr) { + // thread-local buffer has not yet been created, so create it + jobject jtl_buf = env->NewGlobalRef(ByteBufferJni::construct( + env, m_options->direct_buffer, m_options->max_reused_buffer_size, + m_jbytebuffer_clazz)); + if (jtl_buf == nullptr) { + // exception thrown: OutOfMemoryError + return nullptr; + } + tlb = new ThreadLocalBuf(m_jvm, m_options->direct_buffer, jtl_buf); + tl_buf->Reset(tlb); + } + return ReuseBuffer(env, src, tlb->jbuf); + } else { + // reuse class member buffer + return ReuseBuffer(env, src, jreuse_buffer); + } + } else { + // new buffer + return NewBuffer(env, src); + } +} + +jobject ComparatorJniCallback::ReuseBuffer(JNIEnv* env, const Slice& src, + jobject jreuse_buffer) const { + // we can reuse the buffer + if (m_options->direct_buffer) { + // copy into direct buffer + void* buf = env->GetDirectBufferAddress(jreuse_buffer); + if (buf == nullptr) { + // either memory region is undefined, given object is not a direct + // java.nio.Buffer, or JNI access to direct buffers is not supported by + // this virtual machine. + ROCKSDB_NAMESPACE::RocksDBExceptionJni::ThrowNew( + env, "Unable to get Direct Buffer Address"); + return nullptr; + } + memcpy(buf, src.data(), src.size()); + } else { + // copy into non-direct buffer + const jbyteArray jarray = + ByteBufferJni::array(env, jreuse_buffer, m_jbytebuffer_clazz); + if (jarray == nullptr) { + // exception occurred + return nullptr; + } + env->SetByteArrayRegion( + jarray, 0, static_cast(src.size()), + const_cast(reinterpret_cast(src.data()))); + if (env->ExceptionCheck()) { + // exception occurred + env->DeleteLocalRef(jarray); + return nullptr; + } + env->DeleteLocalRef(jarray); + } + return jreuse_buffer; +} + +jobject ComparatorJniCallback::NewBuffer(JNIEnv* env, const Slice& src) const { + // we need a new buffer + jobject jbuf = + ByteBufferJni::constructWith(env, m_options->direct_buffer, src.data(), + src.size(), m_jbytebuffer_clazz); + if (jbuf == nullptr) { + // exception occurred + return nullptr; + } + return jbuf; +} + +void ComparatorJniCallback::DeleteBuffer(JNIEnv* env, jobject jbuffer) const { + env->DeleteLocalRef(jbuffer); +} + +} // namespace ROCKSDB_NAMESPACE -- cgit v1.2.3