diff options
Diffstat (limited to '')
13 files changed, 2672 insertions, 0 deletions
diff --git a/storage/mroonga/vendor/groonga/plugins/sharding.rb b/storage/mroonga/vendor/groonga/plugins/sharding.rb new file mode 100644 index 00000000..86401c1f --- /dev/null +++ b/storage/mroonga/vendor/groonga/plugins/sharding.rb @@ -0,0 +1,11 @@ +require "sharding/parameters" +require "sharding/range_expression_builder" +require "sharding/logical_enumerator" + +require "sharding/logical_parameters" + +require "sharding/logical_count" +require "sharding/logical_range_filter" +require "sharding/logical_select" +require "sharding/logical_shard_list" +require "sharding/logical_table_remove" diff --git a/storage/mroonga/vendor/groonga/plugins/sharding/CMakeLists.txt b/storage/mroonga/vendor/groonga/plugins/sharding/CMakeLists.txt new file mode 100644 index 00000000..1131520f --- /dev/null +++ b/storage/mroonga/vendor/groonga/plugins/sharding/CMakeLists.txt @@ -0,0 +1,24 @@ +# Copyright(C) 2015 Brazil +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License version 2.1 as published by the Free Software Foundation. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA + +if(NOT GRN_EMBED) + if(GRN_WITH_MRUBY) + set(GRN_RELATIVE_SHARDING_PLUGINS_DIR "${GRN_RELATIVE_PLUGINS_DIR}/sharding") + + read_file_list(${CMAKE_CURRENT_SOURCE_DIR}/sources.am SHARDING_SCRIPTS) + install(FILES ${SHARDING_SCRIPTS} + DESTINATION "${GRN_RELATIVE_SHARDING_PLUGINS_DIR}") + endif() +endif() diff --git a/storage/mroonga/vendor/groonga/plugins/sharding/Makefile.am b/storage/mroonga/vendor/groonga/plugins/sharding/Makefile.am new file mode 100644 index 00000000..8104ab6d --- /dev/null +++ b/storage/mroonga/vendor/groonga/plugins/sharding/Makefile.am @@ -0,0 +1,9 @@ +EXTRA_DIST = \ + CMakeLists.txt + +if WITH_MRUBY +dist_sharding_plugins_DATA = \ + $(sharding_scripts) +endif + +include sources.am diff --git a/storage/mroonga/vendor/groonga/plugins/sharding/logical_count.rb b/storage/mroonga/vendor/groonga/plugins/sharding/logical_count.rb new file mode 100644 index 00000000..8bdd77ef --- /dev/null +++ b/storage/mroonga/vendor/groonga/plugins/sharding/logical_count.rb @@ -0,0 +1,169 @@ +module Groonga + module Sharding + class LogicalCountCommand < Command + register("logical_count", + [ + "logical_table", + "shard_key", + "min", + "min_border", + "max", + "max_border", + "filter", + ]) + + def run_body(input) + enumerator = LogicalEnumerator.new("logical_count", input) + filter = input[:filter] + + total = 0 + enumerator.each do |shard, shard_range| + total += count_n_records(filter, shard, shard_range, + enumerator.target_range) + end + writer.write(total) + end + + private + def cache_key(input) + key = "logical_count\0" + key << "#{input[:logical_table]}\0" + key << "#{input[:shard_key]}\0" + key << "#{input[:min]}\0" + key << "#{input[:min_border]}\0" + key << "#{input[:max]}\0" + key << "#{input[:max_border]}\0" + key << "#{input[:filter]}\0" + key + end + + def log_use_range_index(use, table_name, line, method) + message = "[logical_count]" + if use + message << "[range-index]" + else + message << "[select]" + end + message << " <#{table_name}>" + Context.instance.logger.log(Logger::Level::DEBUG, + __FILE__, + line, + method.to_s, + message) + end + + def count_n_records(filter, shard, shard_range, target_range) + cover_type = target_range.cover_type(shard_range) + return 0 if cover_type == :none + + shard_key = shard.key + if shard_key.nil? + message = "[logical_count] shard_key doesn't exist: " + + "<#{shard.key_name}>" + raise InvalidArgument, message + end + table = shard.table + table_name = shard.table_name + + expression_builder = RangeExpressionBuilder.new(shard_key, + target_range) + expression_builder.filter = filter + if cover_type == :all + log_use_range_index(false, table_name, __LINE__, __method__) + if filter.nil? + return table.size + else + return filtered_count_n_records(table) do |expression| + expression_builder.build_all(expression) + end + end + end + + range_index = nil + if filter.nil? + index_info = shard_key.find_index(Operator::LESS) + if index_info + range_index = index_info.index + end + end + + use_range_index = (!range_index.nil?) + log_use_range_index(use_range_index, table_name, __LINE__, __method__) + + case cover_type + when :partial_min + if range_index + count_n_records_in_range(range_index, + target_range.min, target_range.min_border, + nil, nil) + else + filtered_count_n_records(table) do |expression| + expression_builder.build_partial_min(expression) + end + end + when :partial_max + if range_index + count_n_records_in_range(range_index, + nil, nil, + target_range.max, target_range.max_border) + else + filtered_count_n_records(table) do |expression| + expression_builder.build_partial_max(expression) + end + end + when :partial_min_and_max + if range_index + count_n_records_in_range(range_index, + target_range.min, target_range.min_border, + target_range.max, target_range.max_border) + else + filtered_count_n_records(table) do |expression| + expression_builder.build_partial_min_and_max(expression) + end + end + end + end + + def filtered_count_n_records(table) + expression = nil + filtered_table = nil + + begin + expression = Expression.create(table) + yield(expression) + filtered_table = table.select(expression) + filtered_table.size + ensure + filtered_table.close if filtered_table + expression.close if expression + end + end + + def count_n_records_in_range(range_index, + min, min_border, max, max_border) + flags = TableCursorFlags::BY_KEY + case min_border + when :include + flags |= TableCursorFlags::GE + when :exclude + flags |= TableCursorFlags::GT + end + case max_border + when :include + flags |= TableCursorFlags::LE + when :exclude + flags |= TableCursorFlags::LT + end + + TableCursor.open(range_index.table, + :min => min, + :max => max, + :flags => flags) do |table_cursor| + IndexCursor.open(table_cursor, range_index) do |index_cursor| + index_cursor.count + end + end + end + end + end +end diff --git a/storage/mroonga/vendor/groonga/plugins/sharding/logical_enumerator.rb b/storage/mroonga/vendor/groonga/plugins/sharding/logical_enumerator.rb new file mode 100644 index 00000000..d05a220f --- /dev/null +++ b/storage/mroonga/vendor/groonga/plugins/sharding/logical_enumerator.rb @@ -0,0 +1,317 @@ +module Groonga + module Sharding + class LogicalEnumerator + include Enumerable + + attr_reader :target_range + attr_reader :logical_table + attr_reader :shard_key_name + def initialize(command_name, input, options={}) + @command_name = command_name + @input = input + @options = options + initialize_parameters + end + + def each(&block) + each_internal(:ascending, &block) + end + + def reverse_each(&block) + each_internal(:descending, &block) + end + + private + def each_internal(order) + context = Context.instance + each_shard_with_around(order) do |prev_shard, current_shard, next_shard| + shard_range_data = current_shard.range_data + shard_range = nil + + if shard_range_data.day.nil? + if order == :ascending + if next_shard + next_shard_range_data = next_shard.range_data + else + next_shard_range_data = nil + end + else + if prev_shard + next_shard_range_data = prev_shard.range_data + else + next_shard_range_data = nil + end + end + max_day = compute_month_shard_max_day(shard_range_data.year, + shard_range_data.month, + next_shard_range_data) + shard_range = MonthShardRange.new(shard_range_data.year, + shard_range_data.month, + max_day) + else + shard_range = DayShardRange.new(shard_range_data.year, + shard_range_data.month, + shard_range_data.day) + end + + yield(current_shard, shard_range) + end + end + + def each_shard_with_around(order) + context = Context.instance + prefix = "#{@logical_table}_" + + shards = [nil] + context.database.each_name(:prefix => prefix, + :order_by => :key, + :order => order) do |name| + shard_range_raw = name[prefix.size..-1] + + case shard_range_raw + when /\A(\d{4})(\d{2})\z/ + shard_range_data = ShardRangeData.new($1.to_i, $2.to_i, nil) + when /\A(\d{4})(\d{2})(\d{2})\z/ + shard_range_data = ShardRangeData.new($1.to_i, $2.to_i, $3.to_i) + else + next + end + + shards << Shard.new(name, @shard_key_name, shard_range_data) + next if shards.size < 3 + yield(*shards) + shards.shift + end + + if shards.size == 2 + yield(shards[0], shards[1], nil) + end + end + + private + def initialize_parameters + @logical_table = @input[:logical_table] + if @logical_table.nil? + raise InvalidArgument, "[#{@command_name}] logical_table is missing" + end + + @shard_key_name = @input[:shard_key] + if @shard_key_name.nil? + require_shard_key = @options[:require_shard_key] + require_shard_key = true if require_shard_key.nil? + if require_shard_key + raise InvalidArgument, "[#{@command_name}] shard_key is missing" + end + end + + @target_range = TargetRange.new(@command_name, @input) + end + + def compute_month_shard_max_day(year, month, next_shard_range) + return nil if next_shard_range.nil? + + return nil if month != next_shard_range.month + + next_shard_range.day + end + + class Shard + attr_reader :table_name, :key_name, :range_data + def initialize(table_name, key_name, range_data) + @table_name = table_name + @key_name = key_name + @range_data = range_data + end + + def table + @table ||= Context.instance[@table_name] + end + + def full_key_name + "#{@table_name}.#{@key_name}" + end + + def key + @key ||= Context.instance[full_key_name] + end + end + + class ShardRangeData + attr_reader :year, :month, :day + def initialize(year, month, day) + @year = year + @month = month + @day = day + end + + def to_suffix + if @day.nil? + "_%04d%02d" % [@year, @month] + else + "_%04d%02d%02d" % [@year, @month, @day] + end + end + end + + class DayShardRange + attr_reader :year, :month, :day + def initialize(year, month, day) + @year = year + @month = month + @day = day + end + + def least_over_time + next_day = Time.local(@year, @month, @day) + (60 * 60 * 24) + while next_day.day == @day # For leap second + next_day += 1 + end + next_day + end + + def min_time + Time.local(@year, @month, @day) + end + + def include?(time) + @year == time.year and + @month == time.month and + @day == time.day + end + end + + class MonthShardRange + attr_reader :year, :month, :max_day + def initialize(year, month, max_day) + @year = year + @month = month + @max_day = max_day + end + + def least_over_time + if @max_day.nil? + if @month == 12 + Time.local(@year + 1, 1, 1) + else + Time.local(@year, @month + 1, 1) + end + else + Time.local(@year, @month, @max_day) + end + end + + def min_time + Time.local(@year, @month, 1) + end + + def include?(time) + return false unless @year == time.year + return false unless @month == time.month + + if @max_day.nil? + true + else + time.day <= @max_day + end + end + end + + class TargetRange + attr_reader :min, :min_border + attr_reader :max, :max_border + def initialize(command_name, input) + @command_name = command_name + @input = input + @min = parse_value(:min) + @min_border = parse_border(:min_border) + @max = parse_value(:max) + @max_border = parse_border(:max_border) + end + + def cover_type(shard_range) + return :all if @min.nil? and @max.nil? + + if @min and @max + return :none unless in_min?(shard_range) + return :none unless in_max?(shard_range) + min_partial_p = in_min_partial?(shard_range) + max_partial_p = in_max_partial?(shard_range) + if min_partial_p and max_partial_p + :partial_min_and_max + elsif min_partial_p + :partial_min + elsif max_partial_p + :partial_max + else + :all + end + elsif @min + return :none unless in_min?(shard_range) + if in_min_partial?(shard_range) + :partial_min + else + :all + end + else + return :none unless in_max?(shard_range) + if in_max_partial?(shard_range) + :partial_max + else + :all + end + end + end + + private + def parse_value(name) + value = @input[name] + return nil if value.nil? + + Converter.convert(value, Time) + end + + def parse_border(name) + border = @input[name] + return :include if border.nil? + + case border + when "include" + :include + when "exclude" + :exclude + else + message = + "[#{@command_name}] #{name} must be \"include\" or \"exclude\": " + + "<#{border}>" + raise InvalidArgument, message + end + end + + def in_min?(shard_range) + @min < shard_range.least_over_time + end + + def in_min_partial?(shard_range) + return false unless shard_range.include?(@min) + + return true if @min_border == :exclude + + shard_range.min_time != @min + end + + def in_max?(shard_range) + max_base_time = shard_range.min_time + if @max_border == :include + @max >= max_base_time + else + @max > max_base_time + end + end + + def in_max_partial?(shard_range) + shard_range.include?(@max) + end + end + end + end +end diff --git a/storage/mroonga/vendor/groonga/plugins/sharding/logical_parameters.rb b/storage/mroonga/vendor/groonga/plugins/sharding/logical_parameters.rb new file mode 100644 index 00000000..75ff569b --- /dev/null +++ b/storage/mroonga/vendor/groonga/plugins/sharding/logical_parameters.rb @@ -0,0 +1,44 @@ +module Groonga + module Sharding + class LogicalParametersCommand < Command + register("logical_parameters", + [ + "range_index", + ]) + + def run_body(input) + range_index = parse_range_index(input[:range_index]) + + parameters = [ + :range_index, + ] + writer.map("parameters", parameters.size) do + parameters.each do |name| + writer.write(name.to_s) + writer.write(Parameters.__send__(name)) + end + end + + Parameters.range_index = range_index if range_index + end + + private + def parse_range_index(value) + case value + when nil + nil + when "auto" + :auto + when "always" + :always + when "never" + :never + else + message = "[logical_parameters][range_index] " + message << "must be auto, always or never: <#{value}>" + raise InvalidArgument, message + end + end + end + end +end diff --git a/storage/mroonga/vendor/groonga/plugins/sharding/logical_range_filter.rb b/storage/mroonga/vendor/groonga/plugins/sharding/logical_range_filter.rb new file mode 100644 index 00000000..1c8f8644 --- /dev/null +++ b/storage/mroonga/vendor/groonga/plugins/sharding/logical_range_filter.rb @@ -0,0 +1,642 @@ +module Groonga + module Sharding + class LogicalRangeFilterCommand < Command + register("logical_range_filter", + [ + "logical_table", + "shard_key", + "min", + "min_border", + "max", + "max_border", + "order", + "filter", + "offset", + "limit", + "output_columns", + "use_range_index", + ]) + + def run_body(input) + output_columns = input[:output_columns] || "_key, *" + + context = ExecuteContext.new(input) + begin + executor = Executor.new(context) + executor.execute + + result_sets = context.result_sets + n_elements = 1 # for columns + result_sets.each do |result_set| + n_elements += result_set.size + end + + writer.array("RESULTSET", n_elements) do + first_result_set = result_sets.first + if first_result_set + writer.write_table_columns(first_result_set, output_columns) + end + limit = context.limit + if limit < 0 + n_records = result_sets.inject(0) do |n, result_set| + n + result_set.size + end + limit = n_records + limit + 1 + end + options = {} + result_sets.each do |result_set| + options[:limit] = limit + writer.write_table_records(result_set, output_columns, options) + limit -= result_set.size + break if limit <= 0 + end + end + ensure + context.close + end + end + + private + def cache_key(input) + key = "logical_range_filter\0" + key << "#{input[:logical_table]}\0" + key << "#{input[:shard_key]}\0" + key << "#{input[:min]}\0" + key << "#{input[:min_border]}\0" + key << "#{input[:max]}\0" + key << "#{input[:max_border]}\0" + key << "#{input[:order]}\0" + key << "#{input[:filter]}\0" + key << "#{input[:offset]}\0" + key << "#{input[:limit]}\0" + key << "#{input[:output_columns]}\0" + key << "#{input[:use_range_index]}\0" + key + end + + class ExecuteContext + attr_reader :use_range_index + attr_reader :enumerator + attr_reader :order + attr_reader :filter + attr_reader :offset + attr_reader :limit + attr_accessor :current_offset + attr_accessor :current_limit + attr_reader :result_sets + attr_reader :unsorted_result_sets + attr_reader :threshold + def initialize(input) + @input = input + @use_range_index = parse_use_range_index(@input[:use_range_index]) + @enumerator = LogicalEnumerator.new("logical_range_filter", @input) + @order = parse_order(@input, :order) + @filter = @input[:filter] + @offset = (@input[:offset] || 0).to_i + @limit = (@input[:limit] || 10).to_i + + @current_offset = @offset + @current_limit = @limit + + @result_sets = [] + @unsorted_result_sets = [] + + @threshold = compute_threshold + end + + def close + @unsorted_result_sets.each do |result_set| + result_set.close if result_set.temporary? + end + @result_sets.each do |result_set| + result_set.close if result_set.temporary? + end + end + + private + def parse_use_range_index(use_range_index) + case use_range_index + when "yes" + true + when "no" + false + else + nil + end + end + + def parse_order(input, name) + order = input[name] + return :ascending if order.nil? + + case order + when "ascending" + :ascending + when "descending" + :descending + else + message = + "[logical_range_filter] #{name} must be " + + "\"ascending\" or \"descending\": <#{order}>" + raise InvalidArgument, message + end + end + + def compute_threshold + threshold_env = ENV["GRN_LOGICAL_RANGE_FILTER_THRESHOLD"] + default_threshold = 0.2 + (threshold_env || default_threshold).to_f + end + end + + class Executor + def initialize(context) + @context = context + end + + def execute + first_shard = nil + enumerator = @context.enumerator + target_range = enumerator.target_range + if @context.order == :descending + each_method = :reverse_each + else + each_method = :each + end + enumerator.send(each_method) do |shard, shard_range| + first_shard ||= shard + shard_executor = ShardExecutor.new(@context, shard, shard_range) + shard_executor.execute + break if @context.current_limit == 0 + end + if first_shard.nil? + message = + "[logical_range_filter] no shard exists: " + + "logical_table: <#{enumerator.logical_table}>: " + + "shard_key: <#{enumerator.shard_key_name}>" + raise InvalidArgument, message + end + if @context.result_sets.empty? + result_set = HashTable.create(:flags => ObjectFlags::WITH_SUBREC, + :key_type => first_shard.table) + @context.result_sets << result_set + end + end + end + + class ShardExecutor + def initialize(context, shard, shard_range) + @context = context + @shard = shard + @shard_range = shard_range + + @filter = @context.filter + @result_sets = @context.result_sets + @unsorted_result_sets = @context.unsorted_result_sets + + @target_range = @context.enumerator.target_range + + @cover_type = @target_range.cover_type(@shard_range) + end + + def execute + return if @cover_type == :none + return if @shard.table.empty? + + shard_key = @shard.key + if shard_key.nil? + message = "[logical_range_filter] shard_key doesn't exist: " + + "<#{@shard.key_name}>" + raise InvalidArgument, message + end + + expression_builder = RangeExpressionBuilder.new(shard_key, + @target_range) + expression_builder.filter = @filter + + index_info = shard_key.find_index(Operator::LESS) + if index_info + range_index = index_info.index + unless use_range_index?(range_index, expression_builder) + range_index = nil + end + else + range_index = nil + end + + execute_filter(range_index, expression_builder) + end + + private + def decide_use_range_index(use, reason, line, method) + message = "[logical_range_filter]" + if use + message << "[range-index] " + else + message << "[select] " + end + message << "<#{@shard.table_name}>: " + message << reason + Context.instance.logger.log(Logger::Level::DEBUG, + __FILE__, + line, + method.to_s, + message) + + use + end + + def use_range_index?(range_index, expression_builder) + use_range_index_parameter_message = + "force by use_range_index parameter" + case @context.use_range_index + when true + return decide_use_range_index(true, + use_range_index_parameter_message, + __LINE__, __method__) + when false + return decide_use_range_index(false, + use_range_index_parameter_message, + __LINE__, __method__) + end + + range_index_logical_parameter_message = + "force by range_index logical parameter" + case Parameters.range_index + when :always + return decide_use_range_index(true, + range_index_logical_parameter_message, + __LINE__, __method__) + when :never + return decide_use_range_index(false, + range_index_logical_parameter_message, + __LINE__, __method__) + end + + current_limit = @context.current_limit + if current_limit < 0 + reason = "limit is negative: <#{current_limit}>" + return decide_use_range_index(false, reason, + __LINE__, __method__) + end + + required_n_records = @context.current_offset + current_limit + max_n_records = @shard.table.size + if max_n_records <= required_n_records + reason = "the number of required records (#{required_n_records}) " + reason << ">= " + reason << "the number of records in shard (#{max_n_records})" + return decide_use_range_index(false, reason, + __LINE__, __method__) + end + + threshold = @context.threshold + if threshold <= 0.0 + reason = "threshold is negative: <#{threshold}>" + return decide_use_range_index(true, reason, + __LINE__, __method__) + end + if threshold >= 1.0 + reason = "threshold (#{threshold}) >= 1.0" + return decide_use_range_index(false, reason, + __LINE__, __method__) + end + + table = @shard.table + estimated_n_records = 0 + case @cover_type + when :all + if @filter + create_expression(table) do |expression| + expression_builder.build_all(expression) + unless range_index_available_expression?(expression, + __LINE__, __method__) + return false + end + estimated_n_records = expression.estimate_size(table) + end + else + estimated_n_records = max_n_records + end + when :partial_min + create_expression(table) do |expression| + expression_builder.build_partial_min(expression) + unless range_index_available_expression?(expression, + __LINE__, __method__) + return false + end + estimated_n_records = expression.estimate_size(table) + end + when :partial_max + create_expression(table) do |expression| + expression_builder.build_partial_max(expression) + unless range_index_available_expression?(expression, + __LINE__, __method__) + return false + end + estimated_n_records = expression.estimate_size(table) + end + when :partial_min_and_max + create_expression(table) do |expression| + expression_builder.build_partial_min_and_max(expression) + unless range_index_available_expression?(expression, + __LINE__, __method__) + return false + end + estimated_n_records = expression.estimate_size(table) + end + end + + if estimated_n_records <= required_n_records + reason = "the number of required records (#{required_n_records}) " + reason << ">= " + reason << "the number of estimated records (#{estimated_n_records})" + return decide_use_range_index(false, reason, + __LINE__, __method__) + end + + hit_ratio = estimated_n_records / max_n_records.to_f + use_range_index_by_hit_ratio = (hit_ratio >= threshold) + if use_range_index_by_hit_ratio + relation = ">=" + else + relation = "<" + end + reason = "hit ratio " + reason << "(#{hit_ratio}=#{estimated_n_records}/#{max_n_records}) " + reason << "#{relation} threshold (#{threshold})" + decide_use_range_index(use_range_index_by_hit_ratio, reason, + __LINE__, __method__) + end + + def range_index_available_expression?(expression, line, method_name) + nested_reference_vector_column_accessor = + find_nested_reference_vector_column_accessor(expression) + if nested_reference_vector_column_accessor + reason = "nested reference vector column accessor can't be used: " + reason << "<#{nested_reference_vector_column_accessor.name}>" + return decide_use_range_index(false, reason, line, method_name) + end + + selector_only_procedure = find_selector_only_procedure(expression) + if selector_only_procedure + reason = "selector only procedure can't be used: " + reason << "<#{selector_only_procedure.name}>" + return decide_use_range_index(false, reason, line, method_name) + end + + true + end + + def find_nested_reference_vector_column_accessor(expression) + expression.codes.each do |code| + value = code.value + next unless value.is_a?(Accessor) + + sub_accessor = value + while sub_accessor.have_next? + object = sub_accessor.object + return value if object.is_a?(Column) and object.vector? + sub_accessor = sub_accessor.next + end + end + nil + end + + def find_selector_only_procedure(expression) + expression.codes.each do |code| + value = code.value + return value if value.is_a?(Procedure) and value.selector_only? + end + nil + end + + def execute_filter(range_index, expression_builder) + case @cover_type + when :all + filter_shard_all(range_index, expression_builder) + when :partial_min + if range_index + filter_by_range(range_index, expression_builder, + @target_range.min, @target_range.min_border, + nil, nil) + else + filter_table do |expression| + expression_builder.build_partial_min(expression) + end + end + when :partial_max + if range_index + filter_by_range(range_index, expression_builder, + nil, nil, + @target_range.max, @target_range.max_border) + else + filter_table do |expression| + expression_builder.build_partial_max(expression) + end + end + when :partial_min_and_max + if range_index + filter_by_range(range_index, expression_builder, + @target_range.min, @target_range.min_border, + @target_range.max, @target_range.max_border) + else + filter_table do |expression| + expression_builder.build_partial_min_and_max(expression) + end + end + end + end + + def filter_shard_all(range_index, expression_builder) + table = @shard.table + if @filter.nil? + if table.size <= @context.current_offset + @context.current_offset -= table.size + return + end + if range_index + filter_by_range(range_index, expression_builder, + nil, nil, + nil, nil) + else + sort_result_set(table) + end + else + if range_index + filter_by_range(range_index, expression_builder, + nil, nil, + nil, nil) + else + filter_table do |expression| + expression_builder.build_all(expression) + end + end + end + end + + def create_expression(table) + expression = Expression.create(table) + begin + yield(expression) + ensure + expression.close + end + end + + def filter_by_range(range_index, expression_builder, + min, min_border, max, max_border) + lexicon = range_index.domain + data_table = range_index.range + flags = build_range_search_flags(min_border, max_border) + + result_set = HashTable.create(:flags => ObjectFlags::WITH_SUBREC, + :key_type => data_table) + n_matched_records = 0 + begin + TableCursor.open(lexicon, + :min => min, + :max => max, + :flags => flags) do |table_cursor| + options = { + :offset => @context.current_offset, + } + current_limit = @context.current_limit + if current_limit < 0 + options[:limit] = data_table.size + else + options[:limit] = current_limit + end + max_n_unmatched_records = + compute_max_n_unmatched_records(data_table.size, + options[:limit]) + options[:max_n_unmatched_records] = max_n_unmatched_records + if @filter + create_expression(data_table) do |expression| + expression.parse(@filter) + options[:expression] = expression + IndexCursor.open(table_cursor, range_index) do |index_cursor| + n_matched_records = index_cursor.select(result_set, options) + end + end + else + IndexCursor.open(table_cursor, range_index) do |index_cursor| + n_matched_records = index_cursor.select(result_set, options) + end + end + if n_matched_records == -1 + result_set.close + fallback_message = + "fallback because there are too much unmatched records: " + fallback_message << "<#{max_n_unmatched_records}>" + decide_use_range_index(false, + fallback_message, + __LINE__, __method__) + execute_filter(nil, expression_builder) + return + end + end + rescue + result_set.close + raise + end + + if n_matched_records <= @context.current_offset + @context.current_offset -= n_matched_records + result_set.close + return + end + + if @context.current_offset > 0 + @context.current_offset = 0 + end + if @context.current_limit > 0 + @context.current_limit -= result_set.size + end + @result_sets << result_set + end + + def build_range_search_flags(min_border, max_border) + flags = TableCursorFlags::BY_KEY + case @context.order + when :ascending + flags |= TableCursorFlags::ASCENDING + when :descending + flags |= TableCursorFlags::DESCENDING + end + case min_border + when :include + flags |= TableCursorFlags::GE + when :exclude + flags |= TableCursorFlags::GT + end + case max_border + when :include + flags |= TableCursorFlags::LE + when :exclude + flags |= TableCursorFlags::LT + end + flags + end + + def compute_max_n_unmatched_records(data_table_size, limit) + max_n_unmatched_records = limit * 100 + max_n_sample_records = data_table_size + if max_n_sample_records > 10000 + sample_ratio = 1 / (Math.log(data_table_size) ** 2) + max_n_sample_records = (max_n_sample_records * sample_ratio).ceil + end + if max_n_unmatched_records > max_n_sample_records + max_n_unmatched_records = max_n_sample_records + end + max_n_unmatched_records + end + + def filter_table + table = @shard.table + create_expression(table) do |expression| + yield(expression) + result_set = table.select(expression) + sort_result_set(result_set) + end + end + + def sort_result_set(result_set) + if result_set.empty? + result_set.close if result_set.temporary? + return + end + + if result_set.size <= @context.current_offset + @context.current_offset -= result_set.size + result_set.close if result_set.temporary? + return + end + + @unsorted_result_sets << result_set if result_set.temporary? + sort_keys = [ + { + :key => @context.enumerator.shard_key_name, + :order => @context.order, + }, + ] + if @context.current_limit > 0 + limit = @context.current_limit + else + limit = result_set.size + end + sorted_result_set = result_set.sort(sort_keys, + :offset => @context.current_offset, + :limit => limit) + @result_sets << sorted_result_set + if @context.current_offset > 0 + @context.current_offset = 0 + end + if @context.current_limit > 0 + @context.current_limit -= sorted_result_set.size + end + end + end + end + end +end diff --git a/storage/mroonga/vendor/groonga/plugins/sharding/logical_select.rb b/storage/mroonga/vendor/groonga/plugins/sharding/logical_select.rb new file mode 100644 index 00000000..07ebf9e8 --- /dev/null +++ b/storage/mroonga/vendor/groonga/plugins/sharding/logical_select.rb @@ -0,0 +1,975 @@ +module Groonga + module Sharding + class LogicalSelectCommand < Command + register("logical_select", + [ + "logical_table", + "shard_key", + "min", + "min_border", + "max", + "max_border", + "filter", + # Deprecated since 6.1.5. Use sort_keys instead. + "sortby", + "output_columns", + "offset", + "limit", + "drilldown", + # Deprecated since 6.1.5. Use drilldown_sort_keys instead. + "drilldown_sortby", + "drilldown_output_columns", + "drilldown_offset", + "drilldown_limit", + "drilldown_calc_types", + "drilldown_calc_target", + "sort_keys", + "drilldown_sort_keys", + "match_columns", + "query", + "drilldown_filter", + ]) + + def run_body(input) + context = ExecuteContext.new(input) + begin + executor = Executor.new(context) + executor.execute + + n_results = 1 + n_plain_drilldowns = context.plain_drilldown.n_result_sets + n_labeled_drilldowns = context.labeled_drilldowns.n_result_sets + if n_plain_drilldowns > 0 + n_results += n_plain_drilldowns + elsif + if n_labeled_drilldowns > 0 + n_results += 1 + end + end + + writer.array("RESULT", n_results) do + write_records(writer, context) + if n_plain_drilldowns > 0 + write_plain_drilldowns(writer, context) + elsif n_labeled_drilldowns > 0 + write_labeled_drilldowns(writer, context) + end + end + ensure + context.close + end + end + + private + def cache_key(input) + sort_keys = input[:sort_keys] || input[:sortby] + drilldown_sort_keys = + input[:drilldown_sort_keys] || input[:drilldown_sortby] + key = "logical_select\0" + key << "#{input[:logical_table]}\0" + key << "#{input[:shard_key]}\0" + key << "#{input[:min]}\0" + key << "#{input[:min_border]}\0" + key << "#{input[:max]}\0" + key << "#{input[:max_border]}\0" + key << "#{input[:filter]}\0" + key << "#{sort_keys}\0" + key << "#{input[:output_columns]}\0" + key << "#{input[:offset]}\0" + key << "#{input[:limit]}\0" + key << "#{input[:drilldown]}\0" + key << "#{drilldown_sort_keys}\0" + key << "#{input[:match_columns]}\0" + key << "#{input[:query]}\0" + key << "#{input[:drilldown_output_columns]}\0" + key << "#{input[:drilldown_offset]}\0" + key << "#{input[:drilldown_limit]}\0" + key << "#{input[:drilldown_calc_types]}\0" + key << "#{input[:drilldown_calc_target]}\0" + key << "#{input[:drilldown_filter]}\0" + labeled_drilldowns = LabeledDrilldowns.parse(input).sort_by(&:label) + labeled_drilldowns.each do |drilldown| + key << "#{drilldown.label}\0" + key << "#{drilldown.keys.join(',')}\0" + key << "#{drilldown.output_columns}\0" + key << "#{drilldown.offset}\0" + key << "#{drilldown.limit}\0" + key << "#{drilldown.calc_types}\0" + key << "#{drilldown.calc_target_name}\0" + key << "#{drilldown.filter}\0" + cache_key_dynamic_columns(key, drilldown.dynamic_columns) + end + dynamic_columns = DynamicColumns.parse(input) + cache_key_dynamic_columns(key, dynamic_columns) + key + end + + def cache_key_dynamic_columns(key, dynamic_columns) + [ + :initial, + :filtered, + :output + ].each do |stage| + target_dynamic_columns = dynamic_columns.__send__("each_#{stage}").to_a + target_dynamic_columns.sort_by(&:label).each do |dynamic_column| + key << "#{dynamic_column.label}\0" + key << "#{dynamic_column.stage}\0" + key << "#{dynamic_column.type}\0" + key << "#{dynamic_column.flags}\0" + key << "#{dynamic_column.value}\0" + key << "#{dynamic_column.window_sort_keys.join(',')}\0" + key << "#{dynamic_column.window_group_keys.join(',')}\0" + end + end + end + + def write_records(writer, context) + result_sets = context.result_sets + + n_hits = 0 + n_elements = 2 # for N hits and columns + result_sets.each do |result_set| + n_hits += result_set.size + n_elements += result_set.size + end + + output_columns = context.output_columns + + writer.array("RESULTSET", n_elements) do + writer.array("NHITS", 1) do + writer.write(n_hits) + end + first_result_set = result_sets.first + if first_result_set + writer.write_table_columns(first_result_set, output_columns) + end + + current_offset = context.offset + current_offset += n_hits if current_offset < 0 + current_limit = context.limit + current_limit += n_hits + 1 if current_limit < 0 + options = { + :offset => current_offset, + :limit => current_limit, + } + result_sets.each do |result_set| + if result_set.size > current_offset + writer.write_table_records(result_set, output_columns, options) + current_limit -= result_set.size + end + if current_offset > 0 + current_offset = [current_offset - result_set.size, 0].max + end + break if current_limit <= 0 + options[:offset] = current_offset + options[:limit] = current_limit + end + end + end + + def write_plain_drilldowns(writer, execute_context) + plain_drilldown = execute_context.plain_drilldown + + drilldowns = plain_drilldown.result_sets + output_columns = plain_drilldown.output_columns + options = { + :offset => plain_drilldown.offset, + :limit => plain_drilldown.limit, + } + + drilldowns.each do |drilldown| + n_elements = 2 # for N hits and columns + n_elements += drilldown.size + writer.array("RESULTSET", n_elements) do + writer.array("NHITS", 1) do + writer.write(drilldown.size) + end + writer.write_table_columns(drilldown, output_columns) + writer.write_table_records(drilldown, output_columns, + options) + end + end + end + + def write_labeled_drilldowns(writer, execute_context) + labeled_drilldowns = execute_context.labeled_drilldowns + is_command_version1 = (context.command_version == 1) + + writer.map("DRILLDOWNS", labeled_drilldowns.n_result_sets) do + labeled_drilldowns.each do |drilldown| + writer.write(drilldown.label) + + result_set = drilldown.result_set + n_elements = 2 # for N hits and columns + n_elements += result_set.size + output_columns = drilldown.output_columns + options = { + :offset => drilldown.offset, + :limit => drilldown.limit, + } + + writer.array("RESULTSET", n_elements) do + writer.array("NHITS", 1) do + writer.write(result_set.size) + end + writer.write_table_columns(result_set, output_columns) + if is_command_version1 and drilldown.need_command_version2? + context.with_command_version(2) do + writer.write_table_records(result_set, + drilldown.output_columns_v2, + options) + end + else + writer.write_table_records(result_set, output_columns, options) + end + end + end + end + end + + class LabeledArgumentParser + def initialize(parameters) + @parameters = parameters + end + + def parse(prefix_pattern) + pattern = /\A#{prefix_pattern}\[(.+?)\]\.(.+)\z/ + labeled_arguments = {} + @parameters.each do |key, value| + match_data = pattern.match(key) + next if match_data.nil? + labeled_argument = (labeled_arguments[match_data[1]] ||= {}) + labeled_argument[match_data[2]] = value + end + labeled_arguments + end + end + + module KeysParsable + private + def parse_keys(raw_keys) + return [] if raw_keys.nil? + + raw_keys.strip.split(/ *, */) + end + end + + module Calculatable + def calc_target(table) + return nil if @calc_target_name.nil? + table.find_column(@calc_target_name) + end + + private + def parse_calc_types(raw_types) + return TableGroupFlags::CALC_COUNT if raw_types.nil? + + types = 0 + raw_types.strip.split(/ *, */).each do |name| + case name + when "COUNT" + types |= TableGroupFlags::CALC_COUNT + when "MAX" + types |= TableGroupFlags::CALC_MAX + when "MIN" + types |= TableGroupFlags::CALC_MIN + when "SUM" + types |= TableGroupFlags::CALC_SUM + when "AVG" + types |= TableGroupFlags::CALC_AVG + when "NONE" + # Do nothing + else + raise InvalidArgument, "invalid drilldown calc type: <#{name}>" + end + end + types + end + end + + class ExecuteContext + include KeysParsable + + attr_reader :enumerator + attr_reader :match_columns + attr_reader :query + attr_reader :filter + attr_reader :offset + attr_reader :limit + attr_reader :sort_keys + attr_reader :output_columns + attr_reader :dynamic_columns + attr_reader :result_sets + attr_reader :unsorted_result_sets + attr_reader :plain_drilldown + attr_reader :labeled_drilldowns + attr_reader :temporary_tables + attr_reader :expressions + def initialize(input) + @input = input + @enumerator = LogicalEnumerator.new("logical_select", @input) + @match_columns = @input[:match_columns] + @query = @input[:query] + @filter = @input[:filter] + @offset = (@input[:offset] || 0).to_i + @limit = (@input[:limit] || 10).to_i + @sort_keys = parse_keys(@input[:sort_keys] || @input[:sortby]) + @output_columns = @input[:output_columns] || "_id, _key, *" + + @dynamic_columns = DynamicColumns.parse(@input) + + @result_sets = [] + @unsorted_result_sets = [] + + @plain_drilldown = PlainDrilldownExecuteContext.new(@input) + @labeled_drilldowns = LabeledDrilldowns.parse(@input) + + @temporary_tables = [] + + @expressions = [] + end + + def close + @result_sets.each do |result_set| + result_set.close if result_set.temporary? + end + @unsorted_result_sets.each do |result_set| + result_set.close if result_set.temporary? + end + + @plain_drilldown.close + @labeled_drilldowns.close + + @dynamic_columns.close + + @temporary_tables.each do |table| + table.close + end + + @expressions.each do |expression| + expression.close + end + end + end + + class DynamicColumns + class << self + def parse(input) + parser = LabeledArgumentParser.new(input) + columns = parser.parse(/columns?/) + + initial_contexts = [] + filtered_contexts = [] + output_contexts = [] + columns.each do |label, parameters| + contexts = nil + case parameters["stage"] + when "initial" + contexts = initial_contexts + when "filtered" + contexts = filtered_contexts + when "output" + contexts = output_contexts + else + next + end + contexts << DynamicColumnExecuteContext.new(label, parameters) + end + + new(initial_contexts, + filtered_contexts, + output_contexts) + end + end + + def initialize(initial_contexts, + filtered_contexts, + output_contexts) + @initial_contexts = initial_contexts + @filtered_contexts = filtered_contexts + @output_contexts = output_contexts + end + + def each_initial(&block) + @initial_contexts.each(&block) + end + + def each_filtered(&block) + @filtered_contexts.each(&block) + end + + def each_output(&block) + @output_contexts.each(&block) + end + + def close + @initial_contexts.each do |context| + context.close + end + @filtered_contexts.each do |context| + context.close + end + @output_contexts.each do |context| + context.close + end + end + end + + class DynamicColumnExecuteContext + include KeysParsable + + attr_reader :label + attr_reader :stage + attr_reader :type + attr_reader :flags + attr_reader :value + attr_reader :window_sort_keys + attr_reader :window_group_keys + def initialize(label, parameters) + @label = label + @stage = parameters["stage"] + @type = parse_type(parameters["type"]) + @flags = parse_flags(parameters["flags"] || "COLUMN_SCALAR") + @value = parameters["value"] + @window_sort_keys = parse_keys(parameters["window.sort_keys"]) + @window_group_keys = parse_keys(parameters["window.group_keys"]) + end + + def close + end + + def apply(table, condition=nil) + column = table.create_column(@label, @flags, @type) + return if table.empty? + + expression = Expression.create(table) + begin + expression.parse(@value) + if @window_sort_keys.empty? and @window_group_keys.empty? + expression.condition = condition if condition + table.apply_expression(column, expression) + else + table.apply_window_function(column, expression, + :sort_keys => @window_sort_keys, + :group_keys => @window_group_keys) + end + ensure + expression.close + end + end + + private + def parse_type(type_raw) + return nil if type_raw.nil? + + type = Context.instance[type_raw] + if type.nil? + message = "#{error_message_tag} unknown type: <#{type_raw}>" + raise InvalidArgument, message + end + + case type + when Type, Table + type + else + message = "#{error_message_tag} invalid type: #{type.grn_inspect}" + raise InvalidArgument, message + end + end + + def parse_flags(flags_raw) + Column.parse_flags(error_message_tag, flags_raw) + end + + def error_message_tag + "[logical_select][columns][#{@stage}][#{@label}]" + end + end + + class PlainDrilldownExecuteContext + include KeysParsable + include Calculatable + + attr_reader :keys + attr_reader :offset + attr_reader :limit + attr_reader :sort_keys + attr_reader :output_columns + attr_reader :calc_target_name + attr_reader :calc_types + attr_reader :filter + attr_reader :result_sets + attr_reader :unsorted_result_sets + attr_reader :temporary_tables + attr_reader :expressions + def initialize(input) + @input = input + @keys = parse_keys(@input[:drilldown]) + @offset = (@input[:drilldown_offset] || 0).to_i + @limit = (@input[:drilldown_limit] || 10).to_i + @sort_keys = parse_keys(@input[:drilldown_sort_keys] || + @input[:drilldown_sortby]) + @output_columns = @input[:drilldown_output_columns] + @output_columns ||= "_key, _nsubrecs" + @calc_target_name = @input[:drilldown_calc_target] + @calc_types = parse_calc_types(@input[:drilldown_calc_types]) + @filter = @input[:drilldown_filter] + + @result_sets = [] + @unsorted_result_sets = [] + + @temporary_tables = [] + + @expressions = [] + end + + def close + @result_sets.each do |result_set| + result_set.close + end + @unsorted_result_sets.each do |result_set| + result_set.close + end + + @temporary_tables.each do |table| + table.close + end + + @expressions.each do |expression| + expression.close + end + end + + def have_keys? + @keys.size > 0 + end + + def n_result_sets + @result_sets.size + end + end + + class LabeledDrilldowns + include Enumerable + include TSort + + class << self + def parse(input) + parser = LabeledArgumentParser.new(input) + drilldowns = parser.parse(/drilldowns?/) + + contexts = {} + drilldowns.each do |label, parameters| + next if parameters["keys"].nil? + context = LabeledDrilldownExecuteContext.new(label, parameters) + contexts[label] = context + end + + new(contexts) + end + end + + def initialize(contexts) + @contexts = contexts + @dependencies = {} + @contexts.each do |label, context| + if context.table + depended_context = @contexts[context.table] + if depended_context.nil? + raise "Unknown drilldown: <#{context.table}>" + end + @dependencies[label] = [depended_context] + else + @dependencies[label] = [] + end + end + end + + def close + @contexts.each_value do |context| + context.close + end + end + + def [](label) + @contexts[label] + end + + def have_keys? + not @contexts.empty? + end + + def n_result_sets + @contexts.size + end + + def each(&block) + @contexts.each_value(&block) + end + + def tsort_each_node(&block) + @contexts.each_value(&block) + end + + def tsort_each_child(context, &block) + @dependencies[context.label].each(&block) + end + end + + class LabeledDrilldownExecuteContext + include KeysParsable + include Calculatable + + attr_reader :label + attr_reader :keys + attr_reader :offset + attr_reader :limit + attr_reader :sort_keys + attr_reader :output_columns + attr_reader :calc_target_name + attr_reader :calc_types + attr_reader :filter + attr_reader :table + attr_reader :dynamic_columns + attr_accessor :result_set + attr_accessor :unsorted_result_set + attr_reader :temporary_tables + attr_reader :expressions + def initialize(label, parameters) + @label = label + @keys = parse_keys(parameters["keys"]) + @offset = (parameters["offset"] || 0).to_i + @limit = (parameters["limit"] || 10).to_i + @sort_keys = parse_keys(parameters["sort_keys"] || + parameters["sortby"]) + @output_columns = parameters["output_columns"] + @output_columns ||= "_key, _nsubrecs" + @calc_target_name = parameters["calc_target"] + @calc_types = parse_calc_types(parameters["calc_types"]) + @filter = parameters["filter"] + @table = parameters["table"] + + @dynamic_columns = DynamicColumns.parse(parameters) + + @result_set = nil + @unsorted_result_set = nil + + @temporary_tables = [] + + @expressions = [] + end + + def close + @result_set.close if @result_set + @unsorted_result_set.close if @unsorted_result_set + + @dynamic_columns.close + + @temporary_tables.each do |table| + table.close + end + + @expressions.each do |expression| + expression.close + end + end + + def need_command_version2? + /[.\[]/ === @output_columns + end + + def output_columns_v2 + columns = @output_columns.strip.split(/ *, */) + converted_columns = columns.collect do |column| + match_data = /\A_value\.(.+)\z/.match(column) + if match_data.nil? + column + else + nth_key = keys.index(match_data[1]) + if nth_key + "_key[#{nth_key}]" + else + column + end + end + end + converted_columns.join(",") + end + end + + class Executor + def initialize(context) + @context = context + end + + def execute + execute_search + if @context.plain_drilldown.have_keys? + execute_plain_drilldown + elsif @context.labeled_drilldowns.have_keys? + execute_labeled_drilldowns + end + end + + private + def execute_search + first_shard = nil + enumerator = @context.enumerator + enumerator.each do |shard, shard_range| + first_shard ||= shard + shard_executor = ShardExecutor.new(@context, shard, shard_range) + shard_executor.execute + end + if first_shard.nil? + message = + "[logical_select] no shard exists: " + + "logical_table: <#{enumerator.logical_table}>: " + + "shard_key: <#{enumerator.shard_key_name}>" + raise InvalidArgument, message + end + if @context.result_sets.empty? + result_set = HashTable.create(:flags => ObjectFlags::WITH_SUBREC, + :key_type => first_shard.table) + @context.dynamic_columns.each_initial do |dynamic_column| + dynamic_column.apply(result_set) + end + @context.dynamic_columns.each_filtered do |dynamic_column| + dynamic_column.apply(result_set) + end + @context.result_sets << result_set + end + end + + def execute_plain_drilldown + drilldown = @context.plain_drilldown + group_result = TableGroupResult.new + begin + group_result.key_begin = 0 + group_result.key_end = 0 + group_result.limit = 1 + group_result.flags = drilldown.calc_types + drilldown.keys.each do |key| + @context.result_sets.each do |result_set| + with_calc_target(group_result, + drilldown.calc_target(result_set)) do + result_set.group([key], group_result) + end + end + result_set = group_result.table + result_set = apply_drilldown_filter(drilldown, result_set) + if drilldown.sort_keys.empty? + drilldown.result_sets << result_set + else + drilldown.result_sets << result_set.sort(drilldown.sort_keys) + drilldown.unsorted_result_sets << result_set + end + group_result.table = nil + end + ensure + group_result.close + end + end + + def execute_labeled_drilldowns + drilldowns = @context.labeled_drilldowns + + drilldowns.tsort_each do |drilldown| + group_result = TableGroupResult.new + keys = drilldown.keys + begin + group_result.key_begin = 0 + group_result.key_end = keys.size - 1 + if keys.size > 1 + group_result.max_n_sub_records = 1 + end + group_result.limit = 1 + group_result.flags = drilldown.calc_types + if drilldown.table + target_table = drilldowns[drilldown.table].result_set + with_calc_target(group_result, + drilldown.calc_target(target_table)) do + target_table.group(keys, group_result) + end + else + @context.result_sets.each do |result_set| + with_calc_target(group_result, + drilldown.calc_target(result_set)) do + result_set.group(keys, group_result) + end + end + end + result_set = group_result.table + drilldown.dynamic_columns.each_initial do |dynamic_column| + dynamic_column.apply(result_set) + end + result_set = apply_drilldown_filter(drilldown, result_set) + if drilldown.sort_keys.empty? + drilldown.result_set = result_set + else + drilldown.result_set = result_set.sort(drilldown.sort_keys) + drilldown.unsorted_result_set = result_set + end + group_result.table = nil + ensure + group_result.close + end + end + end + + def with_calc_target(group_result, calc_target) + group_result.calc_target = calc_target + begin + yield + ensure + calc_target.close if calc_target + group_result.calc_target = nil + end + end + + def apply_drilldown_filter(drilldown, result_set) + filter = drilldown.filter + return result_set if filter.nil? + + expression = Expression.create(result_set) + drilldown.expressions << expression + expression.parse(filter) + filtered_result_set = result_set.select(expression) + drilldown.temporary_tables << result_set + filtered_result_set + end + end + + class ShardExecutor + def initialize(context, shard, shard_range) + @context = context + @shard = shard + @shard_range = shard_range + + @target_table = @shard.table + + @match_columns = @context.match_columns + @query = @context.query + @filter = @context.filter + @sort_keys = @context.sort_keys + @result_sets = @context.result_sets + @unsorted_result_sets = @context.unsorted_result_sets + + @target_range = @context.enumerator.target_range + + @cover_type = @target_range.cover_type(@shard_range) + end + + def execute + return if @cover_type == :none + return if @target_table.empty? + + shard_key = @shard.key + if shard_key.nil? + message = "[logical_select] shard_key doesn't exist: " + + "<#{@shard.key_name}>" + raise InvalidArgument, message + end + + @context.dynamic_columns.each_initial do |dynamic_column| + if @target_table == @shard.table + @target_table = create_all_match_table(@target_table) + @context.temporary_tables << @target_table + end + dynamic_column.apply(@target_table) + end + + create_expression_builder(shard_key) do |expression_builder| + case @cover_type + when :all + filter_shard_all(expression_builder) + when :partial_min + filter_table do |expression| + expression_builder.build_partial_min(expression) + end + when :partial_max + filter_table do |expression| + expression_builder.build_partial_max(expression) + end + when :partial_min_and_max + filter_table do |expression| + expression_builder.build_partial_min_and_max(expression) + end + end + end + end + + private + def filter_shard_all(expression_builder) + if @query.nil? and @filter.nil? + add_result_set(@target_table, nil) + @context.temporary_tables.delete(@target_table) + else + filter_table do |expression| + expression_builder.build_all(expression) + end + end + end + + def create_expression(table) + expression = Expression.create(table) + @context.expressions << expression + expression + end + + def create_expression_builder(shard_key) + expression_builder = RangeExpressionBuilder.new(shard_key, + @target_range) + expression_builder.match_columns = @match_columns + expression_builder.query = @query + expression_builder.filter = @filter + begin + yield(expression_builder) + ensure + expression = expression_builder.match_columns_expression + @context.expressions << expression if expression + end + end + + def filter_table + table = @target_table + expression = create_expression(table) + yield(expression) + add_result_set(table.select(expression), expression) + end + + def add_result_set(result_set, condition) + if result_set.empty? + result_set.close + return + end + + @context.dynamic_columns.each_filtered do |dynamic_column| + if result_set == @shard.table + @context.temporary_tables << result_set + result_set = create_all_match_table(result_set) + end + dynamic_column.apply(result_set, condition) + end + + if @sort_keys.empty? + @result_sets << result_set + else + @unsorted_result_sets << result_set + sorted_result_set = result_set.sort(@sort_keys) + @result_sets << sorted_result_set + end + end + + def create_all_match_table(table) + expression = Expression.create(table) + begin + expression.append_constant(true, Operator::PUSH, 1) + table.select(expression) + ensure + expression.close + end + end + end + end + end +end diff --git a/storage/mroonga/vendor/groonga/plugins/sharding/logical_shard_list.rb b/storage/mroonga/vendor/groonga/plugins/sharding/logical_shard_list.rb new file mode 100644 index 00000000..b8ef3f76 --- /dev/null +++ b/storage/mroonga/vendor/groonga/plugins/sharding/logical_shard_list.rb @@ -0,0 +1,28 @@ +module Groonga + module Sharding + class LogicalShardListCommand < Command + register("logical_shard_list", + [ + "logical_table", + ]) + + def run_body(input) + enumerator = LogicalEnumerator.new("logical_shard_list", + input, + :require_shard_key => false) + shard_names = enumerator.collect do |current_shard, shard_range| + current_shard.table_name + end + + writer.array("shards", shard_names.size) do + shard_names.each do |shard_name| + writer.map("shard", 1) do + writer.write("name") + writer.write(shard_name) + end + end + end + end + end + end +end diff --git a/storage/mroonga/vendor/groonga/plugins/sharding/logical_table_remove.rb b/storage/mroonga/vendor/groonga/plugins/sharding/logical_table_remove.rb new file mode 100644 index 00000000..3353d6c3 --- /dev/null +++ b/storage/mroonga/vendor/groonga/plugins/sharding/logical_table_remove.rb @@ -0,0 +1,345 @@ +module Groonga + module Sharding + class LogicalTableRemoveCommand < Command + register("logical_table_remove", + [ + "logical_table", + "shard_key", + "min", + "min_border", + "max", + "max_border", + "dependent", + "force", + ]) + + def run_body(input) + @dependent = (input[:dependent] == "yes") + @force = (input[:force] == "yes") + + enumerator = LogicalEnumerator.new("logical_table_remove", input) + + success = true + enumerator.each do |shard, shard_range| + remove_shard(shard, shard_range, enumerator.target_range) + end + writer.write(success) + end + + private + def remove_shard(shard, shard_range, target_range) + cover_type = target_range.cover_type(shard_range) + return if cover_type == :none + + shard_key = shard.key + if shard_key.nil? + if @force + context.clear_error + else + message = + "[logical_table_remove] shard_key doesn't exist: " + + "<#{shard.key_name}>" + raise InvalidArgument, message + end + end + table = shard.table + + if cover_type == :all or ((table.nil? or shard_key.nil?) and @force) + remove_table(shard, table) + return + end + + expression_builder = RangeExpressionBuilder.new(shard_key, + target_range) + case cover_type + when :partial_min + remove_records(table) do |expression| + expression_builder.build_partial_min(expression) + end + remove_table(shard, table) if table.empty? + when :partial_max + remove_records(table) do |expression| + expression_builder.build_partial_max(expression) + end + remove_table(shard, table) if table.empty? + when :partial_min_and_max + remove_records(table) do |expression| + expression_builder.build_partial_min_and_max(expression) + end + remove_table(shard, table) if table.empty? + end + end + + def collect_referenced_table_ids_from_index_ids(index_ids, + referenced_table_ids) + database = context.database + index_ids.each do |index_id| + index = context[index_id] + if index.nil? + context.clear_error + index_name = database[index_id] + lexicon_name = index_name.split(".", 2)[0] + lexicon_id = database[lexicon_name] + referenced_table_ids << lexicon_id if lexicon_id + else + referenced_table_ids << index.domain_id + end + end + end + + def collect_referenced_table_ids_from_column_name(column_name, + referenced_table_ids) + database = context.database + column_id = database[column_name] + database.each_raw do |id, cursor| + next if ID.builtin?(id) + next if id == column_id + + context.open_temporary(id) do |object| + if object.nil? + context.clear_error + next + end + + case object + when IndexColumn + if object.source_ids.include?(column_id) + collect_referenced_table_ids_from_index_ids([id], + referenced_table_ids) + end + end + end + end + end + + def collect_referenced_table_ids_from_column(column, + referenced_table_ids) + range = column.range + case range + when nil + context.clear_error + when Table + referenced_table_ids << range.id + collect_referenced_table_ids_from_index_ids(range.index_ids, + referenced_table_ids) + end + collect_referenced_table_ids_from_index_ids(column.index_ids, + referenced_table_ids) + end + + def collect_referenced_table_ids_from_column_names(column_names) + referenced_table_ids = [] + column_names.each do |column_name| + column = context[column_name] + if column.nil? + context.clear_error + collect_referenced_table_ids_from_column_name(column_name, + referenced_table_ids) + else + collect_referenced_table_ids_from_column(column, + referenced_table_ids) + end + end + referenced_table_ids + end + + def collect_referenced_table_ids(shard, table) + return [] unless @dependent + + column_names = nil + if table + begin + column_names = table.columns.collect(&:name) + rescue + context.clear_error + end + end + if column_names.nil? + prefix = "#{shard.table_name}." + column_names = [] + context.database.each_name(:prefix => prefix) do |column_name| + column_names << column_name + end + end + + collect_referenced_table_ids_from_column_names(column_names) + end + + def remove_table(shard, table) + if table.nil? + unless @force + if context.rc == Context::RC::SUCCESS.to_i + error_class = InvalidArgument + else + rc = Context::RC.find(context.rc) + error_class = rc.error_class + end + message = "[logical_table_remove] table is broken: " + + "<#{shard.table_name}>: #{context.error_message}" + raise error_class, message + end + context.clear_error + end + + referenced_table_ids = collect_referenced_table_ids(shard, table) + + if table.nil? + remove_table_force(shard.table_name) + else + options = {:dependent => @dependent} + if @force + begin + table.remove(options) + rescue + context.clear_error + table.close + remove_table_force(shard.table_name) + end + else + table.remove(options) + end + end + + remove_referenced_tables(shard, referenced_table_ids) + end + + def remove_table_force(table_name) + database = context.database + + prefix = "#{table_name}." + database.each_raw(:prefix => prefix) do |id, cursor| + column = context[id] + if column.nil? + context.clear_error + column_name = cursor.key + remove_column_force(column_name) + table = context[table_name] + if table.nil? + context.clear_error + else + table.close + end + else + remove_column(column) + end + end + + table_id = database[table_name] + return if table_id.nil? + + database.each_raw do |id, cursor| + next if ID.builtin?(id) + next if id == table_id + + context.open_temporary(id) do |object| + if object.nil? + context.clear_error + next + end + + case object + when Table + if object.domain_id == table_id + begin + object.remove(:dependent => @dependent) + rescue + context.clear_error + reference_table_name = object.name + object.close + remove_table_force(reference_table_name) + end + end + when Column + if object.range_id == table_id + remove_column(object) + end + end + end + end + + Object.remove_force(table_name) + end + + def remove_column(column) + begin + column.remove(:dependent => @dependent) + rescue + context.clear_error + column_name = column.name + column.close + remove_column_force(column_name) + end + end + + def remove_column_force(column_name) + database = context.database + + column_id = database[column_name] + + column = context[column_id] + if column.nil? + context.clear_error + else + column.index_ids.each do |id| + index_column = context[id] + if index_column.nil? + context.clear_error + index_column_name = database[id] + remove_column_force(index_column_name) + else + remove_column(index_column) + end + end + column.close + end + + Object.remove_force(column_name) + end + + def remove_referenced_tables(shard, referenced_table_ids) + return if referenced_table_ids.empty? + + database = context.database + shard_suffix = shard.range_data.to_suffix + referenced_table_ids.uniq.each do |referenced_table_id| + referenced_table_name = database[referenced_table_id] + next if referenced_table_name.nil? + next unless referenced_table_name.end_with?(shard_suffix) + + referenced_table = context[referenced_table_id] + if referenced_table.nil? + context.clear_error + if @force + Object.remove_force(referenced_table_name) + end + next + end + + if @force + begin + referenced_table.remove(:dependent => @dependent) + rescue + context.clear_error + referenced_table.close + remove_table_force(referenced_table_name) + end + else + referenced_table.remove(:dependent => @dependent) + end + end + end + + def remove_records(table) + expression = nil + + begin + expression = Expression.create(table) + yield(expression) + table.delete(:expression => expression) + ensure + expression.close if expression + end + end + end + end +end diff --git a/storage/mroonga/vendor/groonga/plugins/sharding/parameters.rb b/storage/mroonga/vendor/groonga/plugins/sharding/parameters.rb new file mode 100644 index 00000000..b09a9d6c --- /dev/null +++ b/storage/mroonga/vendor/groonga/plugins/sharding/parameters.rb @@ -0,0 +1,10 @@ +module Groonga + module Sharding + module Parameters + @range_index = :auto + class << self + attr_accessor :range_index + end + end + end +end diff --git a/storage/mroonga/vendor/groonga/plugins/sharding/range_expression_builder.rb b/storage/mroonga/vendor/groonga/plugins/sharding/range_expression_builder.rb new file mode 100644 index 00000000..cc80735d --- /dev/null +++ b/storage/mroonga/vendor/groonga/plugins/sharding/range_expression_builder.rb @@ -0,0 +1,88 @@ +module Groonga + module Sharding + class RangeExpressionBuilder + attr_reader :match_columns_expression + + attr_writer :match_columns + attr_writer :query + attr_writer :filter + + def initialize(key, target_range) + @key = key + @target_range = target_range + @match_columns_expression = nil + @match_columns = nil + @query = nil + @filter = nil + end + + def build_all(expression) + build_condition(expression) + end + + def build_partial_min(expression) + expression.append_object(@key, Operator::PUSH, 1) + expression.append_operator(Operator::GET_VALUE, 1) + expression.append_constant(@target_range.min, Operator::PUSH, 1) + if @target_range.min_border == :include + expression.append_operator(Operator::GREATER_EQUAL, 2) + else + expression.append_operator(Operator::GREATER, 2) + end + build_condition(expression) + end + + def build_partial_max(expression) + expression.append_object(@key, Operator::PUSH, 1) + expression.append_operator(Operator::GET_VALUE, 1) + expression.append_constant(@target_range.max, Operator::PUSH, 1) + if @target_range.max_border == :include + expression.append_operator(Operator::LESS_EQUAL, 2) + else + expression.append_operator(Operator::LESS, 2) + end + build_condition(expression) + end + + def build_partial_min_and_max(expression) + between = Groonga::Context.instance["between"] + expression.append_object(between, Operator::PUSH, 1) + expression.append_object(@key, Operator::PUSH, 1) + expression.append_operator(Operator::GET_VALUE, 1) + expression.append_constant(@target_range.min, Operator::PUSH, 1) + expression.append_constant(@target_range.min_border, + Operator::PUSH, 1) + expression.append_constant(@target_range.max, Operator::PUSH, 1) + expression.append_constant(@target_range.max_border, + Operator::PUSH, 1) + expression.append_operator(Operator::CALL, 5) + build_condition(expression) + end + + private + def build_condition(expression) + if @query + is_empty = expression.empty? + if @match_columns + table = Context.instance[expression[0].domain] + @match_columns_expression = Expression.create(table) + @match_columns_expression.parse(@match_columns) + end + flags = Expression::SYNTAX_QUERY | + Expression::ALLOW_PRAGMA | + Expression::ALLOW_COLUMN + expression.parse(@query, + default_column: @match_columns_expression, + flags: flags) + expression.append_operator(Operator::AND, 2) unless is_empty + end + + if @filter + is_empty = expression.empty? + expression.parse(@filter) + expression.append_operator(Operator::AND, 2) unless is_empty + end + end + end + end +end diff --git a/storage/mroonga/vendor/groonga/plugins/sharding/sources.am b/storage/mroonga/vendor/groonga/plugins/sharding/sources.am new file mode 100644 index 00000000..df2b6d02 --- /dev/null +++ b/storage/mroonga/vendor/groonga/plugins/sharding/sources.am @@ -0,0 +1,10 @@ +sharding_scripts = \ + logical_count.rb \ + logical_enumerator.rb \ + logical_parameters.rb \ + logical_range_filter.rb \ + logical_select.rb \ + logical_shard_list.rb \ + logical_table_remove.rb \ + parameters.rb \ + range_expression_builder.rb |