summaryrefslogtreecommitdiffstats
path: root/storage/mroonga/vendor/groonga/plugins/sharding
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--storage/mroonga/vendor/groonga/plugins/sharding.rb11
-rw-r--r--storage/mroonga/vendor/groonga/plugins/sharding/CMakeLists.txt24
-rw-r--r--storage/mroonga/vendor/groonga/plugins/sharding/Makefile.am9
-rw-r--r--storage/mroonga/vendor/groonga/plugins/sharding/logical_count.rb169
-rw-r--r--storage/mroonga/vendor/groonga/plugins/sharding/logical_enumerator.rb317
-rw-r--r--storage/mroonga/vendor/groonga/plugins/sharding/logical_parameters.rb44
-rw-r--r--storage/mroonga/vendor/groonga/plugins/sharding/logical_range_filter.rb642
-rw-r--r--storage/mroonga/vendor/groonga/plugins/sharding/logical_select.rb975
-rw-r--r--storage/mroonga/vendor/groonga/plugins/sharding/logical_shard_list.rb28
-rw-r--r--storage/mroonga/vendor/groonga/plugins/sharding/logical_table_remove.rb345
-rw-r--r--storage/mroonga/vendor/groonga/plugins/sharding/parameters.rb10
-rw-r--r--storage/mroonga/vendor/groonga/plugins/sharding/range_expression_builder.rb88
-rw-r--r--storage/mroonga/vendor/groonga/plugins/sharding/sources.am10
13 files changed, 2672 insertions, 0 deletions
diff --git a/storage/mroonga/vendor/groonga/plugins/sharding.rb b/storage/mroonga/vendor/groonga/plugins/sharding.rb
new file mode 100644
index 00000000..86401c1f
--- /dev/null
+++ b/storage/mroonga/vendor/groonga/plugins/sharding.rb
@@ -0,0 +1,11 @@
+require "sharding/parameters"
+require "sharding/range_expression_builder"
+require "sharding/logical_enumerator"
+
+require "sharding/logical_parameters"
+
+require "sharding/logical_count"
+require "sharding/logical_range_filter"
+require "sharding/logical_select"
+require "sharding/logical_shard_list"
+require "sharding/logical_table_remove"
diff --git a/storage/mroonga/vendor/groonga/plugins/sharding/CMakeLists.txt b/storage/mroonga/vendor/groonga/plugins/sharding/CMakeLists.txt
new file mode 100644
index 00000000..1131520f
--- /dev/null
+++ b/storage/mroonga/vendor/groonga/plugins/sharding/CMakeLists.txt
@@ -0,0 +1,24 @@
+# Copyright(C) 2015 Brazil
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License version 2.1 as published by the Free Software Foundation.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
+
+if(NOT GRN_EMBED)
+ if(GRN_WITH_MRUBY)
+ set(GRN_RELATIVE_SHARDING_PLUGINS_DIR "${GRN_RELATIVE_PLUGINS_DIR}/sharding")
+
+ read_file_list(${CMAKE_CURRENT_SOURCE_DIR}/sources.am SHARDING_SCRIPTS)
+ install(FILES ${SHARDING_SCRIPTS}
+ DESTINATION "${GRN_RELATIVE_SHARDING_PLUGINS_DIR}")
+ endif()
+endif()
diff --git a/storage/mroonga/vendor/groonga/plugins/sharding/Makefile.am b/storage/mroonga/vendor/groonga/plugins/sharding/Makefile.am
new file mode 100644
index 00000000..8104ab6d
--- /dev/null
+++ b/storage/mroonga/vendor/groonga/plugins/sharding/Makefile.am
@@ -0,0 +1,9 @@
+EXTRA_DIST = \
+ CMakeLists.txt
+
+if WITH_MRUBY
+dist_sharding_plugins_DATA = \
+ $(sharding_scripts)
+endif
+
+include sources.am
diff --git a/storage/mroonga/vendor/groonga/plugins/sharding/logical_count.rb b/storage/mroonga/vendor/groonga/plugins/sharding/logical_count.rb
new file mode 100644
index 00000000..8bdd77ef
--- /dev/null
+++ b/storage/mroonga/vendor/groonga/plugins/sharding/logical_count.rb
@@ -0,0 +1,169 @@
+module Groonga
+ module Sharding
+ class LogicalCountCommand < Command
+ register("logical_count",
+ [
+ "logical_table",
+ "shard_key",
+ "min",
+ "min_border",
+ "max",
+ "max_border",
+ "filter",
+ ])
+
+ def run_body(input)
+ enumerator = LogicalEnumerator.new("logical_count", input)
+ filter = input[:filter]
+
+ total = 0
+ enumerator.each do |shard, shard_range|
+ total += count_n_records(filter, shard, shard_range,
+ enumerator.target_range)
+ end
+ writer.write(total)
+ end
+
+ private
+ def cache_key(input)
+ key = "logical_count\0"
+ key << "#{input[:logical_table]}\0"
+ key << "#{input[:shard_key]}\0"
+ key << "#{input[:min]}\0"
+ key << "#{input[:min_border]}\0"
+ key << "#{input[:max]}\0"
+ key << "#{input[:max_border]}\0"
+ key << "#{input[:filter]}\0"
+ key
+ end
+
+ def log_use_range_index(use, table_name, line, method)
+ message = "[logical_count]"
+ if use
+ message << "[range-index]"
+ else
+ message << "[select]"
+ end
+ message << " <#{table_name}>"
+ Context.instance.logger.log(Logger::Level::DEBUG,
+ __FILE__,
+ line,
+ method.to_s,
+ message)
+ end
+
+ def count_n_records(filter, shard, shard_range, target_range)
+ cover_type = target_range.cover_type(shard_range)
+ return 0 if cover_type == :none
+
+ shard_key = shard.key
+ if shard_key.nil?
+ message = "[logical_count] shard_key doesn't exist: " +
+ "<#{shard.key_name}>"
+ raise InvalidArgument, message
+ end
+ table = shard.table
+ table_name = shard.table_name
+
+ expression_builder = RangeExpressionBuilder.new(shard_key,
+ target_range)
+ expression_builder.filter = filter
+ if cover_type == :all
+ log_use_range_index(false, table_name, __LINE__, __method__)
+ if filter.nil?
+ return table.size
+ else
+ return filtered_count_n_records(table) do |expression|
+ expression_builder.build_all(expression)
+ end
+ end
+ end
+
+ range_index = nil
+ if filter.nil?
+ index_info = shard_key.find_index(Operator::LESS)
+ if index_info
+ range_index = index_info.index
+ end
+ end
+
+ use_range_index = (!range_index.nil?)
+ log_use_range_index(use_range_index, table_name, __LINE__, __method__)
+
+ case cover_type
+ when :partial_min
+ if range_index
+ count_n_records_in_range(range_index,
+ target_range.min, target_range.min_border,
+ nil, nil)
+ else
+ filtered_count_n_records(table) do |expression|
+ expression_builder.build_partial_min(expression)
+ end
+ end
+ when :partial_max
+ if range_index
+ count_n_records_in_range(range_index,
+ nil, nil,
+ target_range.max, target_range.max_border)
+ else
+ filtered_count_n_records(table) do |expression|
+ expression_builder.build_partial_max(expression)
+ end
+ end
+ when :partial_min_and_max
+ if range_index
+ count_n_records_in_range(range_index,
+ target_range.min, target_range.min_border,
+ target_range.max, target_range.max_border)
+ else
+ filtered_count_n_records(table) do |expression|
+ expression_builder.build_partial_min_and_max(expression)
+ end
+ end
+ end
+ end
+
+ def filtered_count_n_records(table)
+ expression = nil
+ filtered_table = nil
+
+ begin
+ expression = Expression.create(table)
+ yield(expression)
+ filtered_table = table.select(expression)
+ filtered_table.size
+ ensure
+ filtered_table.close if filtered_table
+ expression.close if expression
+ end
+ end
+
+ def count_n_records_in_range(range_index,
+ min, min_border, max, max_border)
+ flags = TableCursorFlags::BY_KEY
+ case min_border
+ when :include
+ flags |= TableCursorFlags::GE
+ when :exclude
+ flags |= TableCursorFlags::GT
+ end
+ case max_border
+ when :include
+ flags |= TableCursorFlags::LE
+ when :exclude
+ flags |= TableCursorFlags::LT
+ end
+
+ TableCursor.open(range_index.table,
+ :min => min,
+ :max => max,
+ :flags => flags) do |table_cursor|
+ IndexCursor.open(table_cursor, range_index) do |index_cursor|
+ index_cursor.count
+ end
+ end
+ end
+ end
+ end
+end
diff --git a/storage/mroonga/vendor/groonga/plugins/sharding/logical_enumerator.rb b/storage/mroonga/vendor/groonga/plugins/sharding/logical_enumerator.rb
new file mode 100644
index 00000000..d05a220f
--- /dev/null
+++ b/storage/mroonga/vendor/groonga/plugins/sharding/logical_enumerator.rb
@@ -0,0 +1,317 @@
+module Groonga
+ module Sharding
+ class LogicalEnumerator
+ include Enumerable
+
+ attr_reader :target_range
+ attr_reader :logical_table
+ attr_reader :shard_key_name
+ def initialize(command_name, input, options={})
+ @command_name = command_name
+ @input = input
+ @options = options
+ initialize_parameters
+ end
+
+ def each(&block)
+ each_internal(:ascending, &block)
+ end
+
+ def reverse_each(&block)
+ each_internal(:descending, &block)
+ end
+
+ private
+ def each_internal(order)
+ context = Context.instance
+ each_shard_with_around(order) do |prev_shard, current_shard, next_shard|
+ shard_range_data = current_shard.range_data
+ shard_range = nil
+
+ if shard_range_data.day.nil?
+ if order == :ascending
+ if next_shard
+ next_shard_range_data = next_shard.range_data
+ else
+ next_shard_range_data = nil
+ end
+ else
+ if prev_shard
+ next_shard_range_data = prev_shard.range_data
+ else
+ next_shard_range_data = nil
+ end
+ end
+ max_day = compute_month_shard_max_day(shard_range_data.year,
+ shard_range_data.month,
+ next_shard_range_data)
+ shard_range = MonthShardRange.new(shard_range_data.year,
+ shard_range_data.month,
+ max_day)
+ else
+ shard_range = DayShardRange.new(shard_range_data.year,
+ shard_range_data.month,
+ shard_range_data.day)
+ end
+
+ yield(current_shard, shard_range)
+ end
+ end
+
+ def each_shard_with_around(order)
+ context = Context.instance
+ prefix = "#{@logical_table}_"
+
+ shards = [nil]
+ context.database.each_name(:prefix => prefix,
+ :order_by => :key,
+ :order => order) do |name|
+ shard_range_raw = name[prefix.size..-1]
+
+ case shard_range_raw
+ when /\A(\d{4})(\d{2})\z/
+ shard_range_data = ShardRangeData.new($1.to_i, $2.to_i, nil)
+ when /\A(\d{4})(\d{2})(\d{2})\z/
+ shard_range_data = ShardRangeData.new($1.to_i, $2.to_i, $3.to_i)
+ else
+ next
+ end
+
+ shards << Shard.new(name, @shard_key_name, shard_range_data)
+ next if shards.size < 3
+ yield(*shards)
+ shards.shift
+ end
+
+ if shards.size == 2
+ yield(shards[0], shards[1], nil)
+ end
+ end
+
+ private
+ def initialize_parameters
+ @logical_table = @input[:logical_table]
+ if @logical_table.nil?
+ raise InvalidArgument, "[#{@command_name}] logical_table is missing"
+ end
+
+ @shard_key_name = @input[:shard_key]
+ if @shard_key_name.nil?
+ require_shard_key = @options[:require_shard_key]
+ require_shard_key = true if require_shard_key.nil?
+ if require_shard_key
+ raise InvalidArgument, "[#{@command_name}] shard_key is missing"
+ end
+ end
+
+ @target_range = TargetRange.new(@command_name, @input)
+ end
+
+ def compute_month_shard_max_day(year, month, next_shard_range)
+ return nil if next_shard_range.nil?
+
+ return nil if month != next_shard_range.month
+
+ next_shard_range.day
+ end
+
+ class Shard
+ attr_reader :table_name, :key_name, :range_data
+ def initialize(table_name, key_name, range_data)
+ @table_name = table_name
+ @key_name = key_name
+ @range_data = range_data
+ end
+
+ def table
+ @table ||= Context.instance[@table_name]
+ end
+
+ def full_key_name
+ "#{@table_name}.#{@key_name}"
+ end
+
+ def key
+ @key ||= Context.instance[full_key_name]
+ end
+ end
+
+ class ShardRangeData
+ attr_reader :year, :month, :day
+ def initialize(year, month, day)
+ @year = year
+ @month = month
+ @day = day
+ end
+
+ def to_suffix
+ if @day.nil?
+ "_%04d%02d" % [@year, @month]
+ else
+ "_%04d%02d%02d" % [@year, @month, @day]
+ end
+ end
+ end
+
+ class DayShardRange
+ attr_reader :year, :month, :day
+ def initialize(year, month, day)
+ @year = year
+ @month = month
+ @day = day
+ end
+
+ def least_over_time
+ next_day = Time.local(@year, @month, @day) + (60 * 60 * 24)
+ while next_day.day == @day # For leap second
+ next_day += 1
+ end
+ next_day
+ end
+
+ def min_time
+ Time.local(@year, @month, @day)
+ end
+
+ def include?(time)
+ @year == time.year and
+ @month == time.month and
+ @day == time.day
+ end
+ end
+
+ class MonthShardRange
+ attr_reader :year, :month, :max_day
+ def initialize(year, month, max_day)
+ @year = year
+ @month = month
+ @max_day = max_day
+ end
+
+ def least_over_time
+ if @max_day.nil?
+ if @month == 12
+ Time.local(@year + 1, 1, 1)
+ else
+ Time.local(@year, @month + 1, 1)
+ end
+ else
+ Time.local(@year, @month, @max_day)
+ end
+ end
+
+ def min_time
+ Time.local(@year, @month, 1)
+ end
+
+ def include?(time)
+ return false unless @year == time.year
+ return false unless @month == time.month
+
+ if @max_day.nil?
+ true
+ else
+ time.day <= @max_day
+ end
+ end
+ end
+
+ class TargetRange
+ attr_reader :min, :min_border
+ attr_reader :max, :max_border
+ def initialize(command_name, input)
+ @command_name = command_name
+ @input = input
+ @min = parse_value(:min)
+ @min_border = parse_border(:min_border)
+ @max = parse_value(:max)
+ @max_border = parse_border(:max_border)
+ end
+
+ def cover_type(shard_range)
+ return :all if @min.nil? and @max.nil?
+
+ if @min and @max
+ return :none unless in_min?(shard_range)
+ return :none unless in_max?(shard_range)
+ min_partial_p = in_min_partial?(shard_range)
+ max_partial_p = in_max_partial?(shard_range)
+ if min_partial_p and max_partial_p
+ :partial_min_and_max
+ elsif min_partial_p
+ :partial_min
+ elsif max_partial_p
+ :partial_max
+ else
+ :all
+ end
+ elsif @min
+ return :none unless in_min?(shard_range)
+ if in_min_partial?(shard_range)
+ :partial_min
+ else
+ :all
+ end
+ else
+ return :none unless in_max?(shard_range)
+ if in_max_partial?(shard_range)
+ :partial_max
+ else
+ :all
+ end
+ end
+ end
+
+ private
+ def parse_value(name)
+ value = @input[name]
+ return nil if value.nil?
+
+ Converter.convert(value, Time)
+ end
+
+ def parse_border(name)
+ border = @input[name]
+ return :include if border.nil?
+
+ case border
+ when "include"
+ :include
+ when "exclude"
+ :exclude
+ else
+ message =
+ "[#{@command_name}] #{name} must be \"include\" or \"exclude\": " +
+ "<#{border}>"
+ raise InvalidArgument, message
+ end
+ end
+
+ def in_min?(shard_range)
+ @min < shard_range.least_over_time
+ end
+
+ def in_min_partial?(shard_range)
+ return false unless shard_range.include?(@min)
+
+ return true if @min_border == :exclude
+
+ shard_range.min_time != @min
+ end
+
+ def in_max?(shard_range)
+ max_base_time = shard_range.min_time
+ if @max_border == :include
+ @max >= max_base_time
+ else
+ @max > max_base_time
+ end
+ end
+
+ def in_max_partial?(shard_range)
+ shard_range.include?(@max)
+ end
+ end
+ end
+ end
+end
diff --git a/storage/mroonga/vendor/groonga/plugins/sharding/logical_parameters.rb b/storage/mroonga/vendor/groonga/plugins/sharding/logical_parameters.rb
new file mode 100644
index 00000000..75ff569b
--- /dev/null
+++ b/storage/mroonga/vendor/groonga/plugins/sharding/logical_parameters.rb
@@ -0,0 +1,44 @@
+module Groonga
+ module Sharding
+ class LogicalParametersCommand < Command
+ register("logical_parameters",
+ [
+ "range_index",
+ ])
+
+ def run_body(input)
+ range_index = parse_range_index(input[:range_index])
+
+ parameters = [
+ :range_index,
+ ]
+ writer.map("parameters", parameters.size) do
+ parameters.each do |name|
+ writer.write(name.to_s)
+ writer.write(Parameters.__send__(name))
+ end
+ end
+
+ Parameters.range_index = range_index if range_index
+ end
+
+ private
+ def parse_range_index(value)
+ case value
+ when nil
+ nil
+ when "auto"
+ :auto
+ when "always"
+ :always
+ when "never"
+ :never
+ else
+ message = "[logical_parameters][range_index] "
+ message << "must be auto, always or never: <#{value}>"
+ raise InvalidArgument, message
+ end
+ end
+ end
+ end
+end
diff --git a/storage/mroonga/vendor/groonga/plugins/sharding/logical_range_filter.rb b/storage/mroonga/vendor/groonga/plugins/sharding/logical_range_filter.rb
new file mode 100644
index 00000000..1c8f8644
--- /dev/null
+++ b/storage/mroonga/vendor/groonga/plugins/sharding/logical_range_filter.rb
@@ -0,0 +1,642 @@
+module Groonga
+ module Sharding
+ class LogicalRangeFilterCommand < Command
+ register("logical_range_filter",
+ [
+ "logical_table",
+ "shard_key",
+ "min",
+ "min_border",
+ "max",
+ "max_border",
+ "order",
+ "filter",
+ "offset",
+ "limit",
+ "output_columns",
+ "use_range_index",
+ ])
+
+ def run_body(input)
+ output_columns = input[:output_columns] || "_key, *"
+
+ context = ExecuteContext.new(input)
+ begin
+ executor = Executor.new(context)
+ executor.execute
+
+ result_sets = context.result_sets
+ n_elements = 1 # for columns
+ result_sets.each do |result_set|
+ n_elements += result_set.size
+ end
+
+ writer.array("RESULTSET", n_elements) do
+ first_result_set = result_sets.first
+ if first_result_set
+ writer.write_table_columns(first_result_set, output_columns)
+ end
+ limit = context.limit
+ if limit < 0
+ n_records = result_sets.inject(0) do |n, result_set|
+ n + result_set.size
+ end
+ limit = n_records + limit + 1
+ end
+ options = {}
+ result_sets.each do |result_set|
+ options[:limit] = limit
+ writer.write_table_records(result_set, output_columns, options)
+ limit -= result_set.size
+ break if limit <= 0
+ end
+ end
+ ensure
+ context.close
+ end
+ end
+
+ private
+ def cache_key(input)
+ key = "logical_range_filter\0"
+ key << "#{input[:logical_table]}\0"
+ key << "#{input[:shard_key]}\0"
+ key << "#{input[:min]}\0"
+ key << "#{input[:min_border]}\0"
+ key << "#{input[:max]}\0"
+ key << "#{input[:max_border]}\0"
+ key << "#{input[:order]}\0"
+ key << "#{input[:filter]}\0"
+ key << "#{input[:offset]}\0"
+ key << "#{input[:limit]}\0"
+ key << "#{input[:output_columns]}\0"
+ key << "#{input[:use_range_index]}\0"
+ key
+ end
+
+ class ExecuteContext
+ attr_reader :use_range_index
+ attr_reader :enumerator
+ attr_reader :order
+ attr_reader :filter
+ attr_reader :offset
+ attr_reader :limit
+ attr_accessor :current_offset
+ attr_accessor :current_limit
+ attr_reader :result_sets
+ attr_reader :unsorted_result_sets
+ attr_reader :threshold
+ def initialize(input)
+ @input = input
+ @use_range_index = parse_use_range_index(@input[:use_range_index])
+ @enumerator = LogicalEnumerator.new("logical_range_filter", @input)
+ @order = parse_order(@input, :order)
+ @filter = @input[:filter]
+ @offset = (@input[:offset] || 0).to_i
+ @limit = (@input[:limit] || 10).to_i
+
+ @current_offset = @offset
+ @current_limit = @limit
+
+ @result_sets = []
+ @unsorted_result_sets = []
+
+ @threshold = compute_threshold
+ end
+
+ def close
+ @unsorted_result_sets.each do |result_set|
+ result_set.close if result_set.temporary?
+ end
+ @result_sets.each do |result_set|
+ result_set.close if result_set.temporary?
+ end
+ end
+
+ private
+ def parse_use_range_index(use_range_index)
+ case use_range_index
+ when "yes"
+ true
+ when "no"
+ false
+ else
+ nil
+ end
+ end
+
+ def parse_order(input, name)
+ order = input[name]
+ return :ascending if order.nil?
+
+ case order
+ when "ascending"
+ :ascending
+ when "descending"
+ :descending
+ else
+ message =
+ "[logical_range_filter] #{name} must be " +
+ "\"ascending\" or \"descending\": <#{order}>"
+ raise InvalidArgument, message
+ end
+ end
+
+ def compute_threshold
+ threshold_env = ENV["GRN_LOGICAL_RANGE_FILTER_THRESHOLD"]
+ default_threshold = 0.2
+ (threshold_env || default_threshold).to_f
+ end
+ end
+
+ class Executor
+ def initialize(context)
+ @context = context
+ end
+
+ def execute
+ first_shard = nil
+ enumerator = @context.enumerator
+ target_range = enumerator.target_range
+ if @context.order == :descending
+ each_method = :reverse_each
+ else
+ each_method = :each
+ end
+ enumerator.send(each_method) do |shard, shard_range|
+ first_shard ||= shard
+ shard_executor = ShardExecutor.new(@context, shard, shard_range)
+ shard_executor.execute
+ break if @context.current_limit == 0
+ end
+ if first_shard.nil?
+ message =
+ "[logical_range_filter] no shard exists: " +
+ "logical_table: <#{enumerator.logical_table}>: " +
+ "shard_key: <#{enumerator.shard_key_name}>"
+ raise InvalidArgument, message
+ end
+ if @context.result_sets.empty?
+ result_set = HashTable.create(:flags => ObjectFlags::WITH_SUBREC,
+ :key_type => first_shard.table)
+ @context.result_sets << result_set
+ end
+ end
+ end
+
+ class ShardExecutor
+ def initialize(context, shard, shard_range)
+ @context = context
+ @shard = shard
+ @shard_range = shard_range
+
+ @filter = @context.filter
+ @result_sets = @context.result_sets
+ @unsorted_result_sets = @context.unsorted_result_sets
+
+ @target_range = @context.enumerator.target_range
+
+ @cover_type = @target_range.cover_type(@shard_range)
+ end
+
+ def execute
+ return if @cover_type == :none
+ return if @shard.table.empty?
+
+ shard_key = @shard.key
+ if shard_key.nil?
+ message = "[logical_range_filter] shard_key doesn't exist: " +
+ "<#{@shard.key_name}>"
+ raise InvalidArgument, message
+ end
+
+ expression_builder = RangeExpressionBuilder.new(shard_key,
+ @target_range)
+ expression_builder.filter = @filter
+
+ index_info = shard_key.find_index(Operator::LESS)
+ if index_info
+ range_index = index_info.index
+ unless use_range_index?(range_index, expression_builder)
+ range_index = nil
+ end
+ else
+ range_index = nil
+ end
+
+ execute_filter(range_index, expression_builder)
+ end
+
+ private
+ def decide_use_range_index(use, reason, line, method)
+ message = "[logical_range_filter]"
+ if use
+ message << "[range-index] "
+ else
+ message << "[select] "
+ end
+ message << "<#{@shard.table_name}>: "
+ message << reason
+ Context.instance.logger.log(Logger::Level::DEBUG,
+ __FILE__,
+ line,
+ method.to_s,
+ message)
+
+ use
+ end
+
+ def use_range_index?(range_index, expression_builder)
+ use_range_index_parameter_message =
+ "force by use_range_index parameter"
+ case @context.use_range_index
+ when true
+ return decide_use_range_index(true,
+ use_range_index_parameter_message,
+ __LINE__, __method__)
+ when false
+ return decide_use_range_index(false,
+ use_range_index_parameter_message,
+ __LINE__, __method__)
+ end
+
+ range_index_logical_parameter_message =
+ "force by range_index logical parameter"
+ case Parameters.range_index
+ when :always
+ return decide_use_range_index(true,
+ range_index_logical_parameter_message,
+ __LINE__, __method__)
+ when :never
+ return decide_use_range_index(false,
+ range_index_logical_parameter_message,
+ __LINE__, __method__)
+ end
+
+ current_limit = @context.current_limit
+ if current_limit < 0
+ reason = "limit is negative: <#{current_limit}>"
+ return decide_use_range_index(false, reason,
+ __LINE__, __method__)
+ end
+
+ required_n_records = @context.current_offset + current_limit
+ max_n_records = @shard.table.size
+ if max_n_records <= required_n_records
+ reason = "the number of required records (#{required_n_records}) "
+ reason << ">= "
+ reason << "the number of records in shard (#{max_n_records})"
+ return decide_use_range_index(false, reason,
+ __LINE__, __method__)
+ end
+
+ threshold = @context.threshold
+ if threshold <= 0.0
+ reason = "threshold is negative: <#{threshold}>"
+ return decide_use_range_index(true, reason,
+ __LINE__, __method__)
+ end
+ if threshold >= 1.0
+ reason = "threshold (#{threshold}) >= 1.0"
+ return decide_use_range_index(false, reason,
+ __LINE__, __method__)
+ end
+
+ table = @shard.table
+ estimated_n_records = 0
+ case @cover_type
+ when :all
+ if @filter
+ create_expression(table) do |expression|
+ expression_builder.build_all(expression)
+ unless range_index_available_expression?(expression,
+ __LINE__, __method__)
+ return false
+ end
+ estimated_n_records = expression.estimate_size(table)
+ end
+ else
+ estimated_n_records = max_n_records
+ end
+ when :partial_min
+ create_expression(table) do |expression|
+ expression_builder.build_partial_min(expression)
+ unless range_index_available_expression?(expression,
+ __LINE__, __method__)
+ return false
+ end
+ estimated_n_records = expression.estimate_size(table)
+ end
+ when :partial_max
+ create_expression(table) do |expression|
+ expression_builder.build_partial_max(expression)
+ unless range_index_available_expression?(expression,
+ __LINE__, __method__)
+ return false
+ end
+ estimated_n_records = expression.estimate_size(table)
+ end
+ when :partial_min_and_max
+ create_expression(table) do |expression|
+ expression_builder.build_partial_min_and_max(expression)
+ unless range_index_available_expression?(expression,
+ __LINE__, __method__)
+ return false
+ end
+ estimated_n_records = expression.estimate_size(table)
+ end
+ end
+
+ if estimated_n_records <= required_n_records
+ reason = "the number of required records (#{required_n_records}) "
+ reason << ">= "
+ reason << "the number of estimated records (#{estimated_n_records})"
+ return decide_use_range_index(false, reason,
+ __LINE__, __method__)
+ end
+
+ hit_ratio = estimated_n_records / max_n_records.to_f
+ use_range_index_by_hit_ratio = (hit_ratio >= threshold)
+ if use_range_index_by_hit_ratio
+ relation = ">="
+ else
+ relation = "<"
+ end
+ reason = "hit ratio "
+ reason << "(#{hit_ratio}=#{estimated_n_records}/#{max_n_records}) "
+ reason << "#{relation} threshold (#{threshold})"
+ decide_use_range_index(use_range_index_by_hit_ratio, reason,
+ __LINE__, __method__)
+ end
+
+ def range_index_available_expression?(expression, line, method_name)
+ nested_reference_vector_column_accessor =
+ find_nested_reference_vector_column_accessor(expression)
+ if nested_reference_vector_column_accessor
+ reason = "nested reference vector column accessor can't be used: "
+ reason << "<#{nested_reference_vector_column_accessor.name}>"
+ return decide_use_range_index(false, reason, line, method_name)
+ end
+
+ selector_only_procedure = find_selector_only_procedure(expression)
+ if selector_only_procedure
+ reason = "selector only procedure can't be used: "
+ reason << "<#{selector_only_procedure.name}>"
+ return decide_use_range_index(false, reason, line, method_name)
+ end
+
+ true
+ end
+
+ def find_nested_reference_vector_column_accessor(expression)
+ expression.codes.each do |code|
+ value = code.value
+ next unless value.is_a?(Accessor)
+
+ sub_accessor = value
+ while sub_accessor.have_next?
+ object = sub_accessor.object
+ return value if object.is_a?(Column) and object.vector?
+ sub_accessor = sub_accessor.next
+ end
+ end
+ nil
+ end
+
+ def find_selector_only_procedure(expression)
+ expression.codes.each do |code|
+ value = code.value
+ return value if value.is_a?(Procedure) and value.selector_only?
+ end
+ nil
+ end
+
+ def execute_filter(range_index, expression_builder)
+ case @cover_type
+ when :all
+ filter_shard_all(range_index, expression_builder)
+ when :partial_min
+ if range_index
+ filter_by_range(range_index, expression_builder,
+ @target_range.min, @target_range.min_border,
+ nil, nil)
+ else
+ filter_table do |expression|
+ expression_builder.build_partial_min(expression)
+ end
+ end
+ when :partial_max
+ if range_index
+ filter_by_range(range_index, expression_builder,
+ nil, nil,
+ @target_range.max, @target_range.max_border)
+ else
+ filter_table do |expression|
+ expression_builder.build_partial_max(expression)
+ end
+ end
+ when :partial_min_and_max
+ if range_index
+ filter_by_range(range_index, expression_builder,
+ @target_range.min, @target_range.min_border,
+ @target_range.max, @target_range.max_border)
+ else
+ filter_table do |expression|
+ expression_builder.build_partial_min_and_max(expression)
+ end
+ end
+ end
+ end
+
+ def filter_shard_all(range_index, expression_builder)
+ table = @shard.table
+ if @filter.nil?
+ if table.size <= @context.current_offset
+ @context.current_offset -= table.size
+ return
+ end
+ if range_index
+ filter_by_range(range_index, expression_builder,
+ nil, nil,
+ nil, nil)
+ else
+ sort_result_set(table)
+ end
+ else
+ if range_index
+ filter_by_range(range_index, expression_builder,
+ nil, nil,
+ nil, nil)
+ else
+ filter_table do |expression|
+ expression_builder.build_all(expression)
+ end
+ end
+ end
+ end
+
+ def create_expression(table)
+ expression = Expression.create(table)
+ begin
+ yield(expression)
+ ensure
+ expression.close
+ end
+ end
+
+ def filter_by_range(range_index, expression_builder,
+ min, min_border, max, max_border)
+ lexicon = range_index.domain
+ data_table = range_index.range
+ flags = build_range_search_flags(min_border, max_border)
+
+ result_set = HashTable.create(:flags => ObjectFlags::WITH_SUBREC,
+ :key_type => data_table)
+ n_matched_records = 0
+ begin
+ TableCursor.open(lexicon,
+ :min => min,
+ :max => max,
+ :flags => flags) do |table_cursor|
+ options = {
+ :offset => @context.current_offset,
+ }
+ current_limit = @context.current_limit
+ if current_limit < 0
+ options[:limit] = data_table.size
+ else
+ options[:limit] = current_limit
+ end
+ max_n_unmatched_records =
+ compute_max_n_unmatched_records(data_table.size,
+ options[:limit])
+ options[:max_n_unmatched_records] = max_n_unmatched_records
+ if @filter
+ create_expression(data_table) do |expression|
+ expression.parse(@filter)
+ options[:expression] = expression
+ IndexCursor.open(table_cursor, range_index) do |index_cursor|
+ n_matched_records = index_cursor.select(result_set, options)
+ end
+ end
+ else
+ IndexCursor.open(table_cursor, range_index) do |index_cursor|
+ n_matched_records = index_cursor.select(result_set, options)
+ end
+ end
+ if n_matched_records == -1
+ result_set.close
+ fallback_message =
+ "fallback because there are too much unmatched records: "
+ fallback_message << "<#{max_n_unmatched_records}>"
+ decide_use_range_index(false,
+ fallback_message,
+ __LINE__, __method__)
+ execute_filter(nil, expression_builder)
+ return
+ end
+ end
+ rescue
+ result_set.close
+ raise
+ end
+
+ if n_matched_records <= @context.current_offset
+ @context.current_offset -= n_matched_records
+ result_set.close
+ return
+ end
+
+ if @context.current_offset > 0
+ @context.current_offset = 0
+ end
+ if @context.current_limit > 0
+ @context.current_limit -= result_set.size
+ end
+ @result_sets << result_set
+ end
+
+ def build_range_search_flags(min_border, max_border)
+ flags = TableCursorFlags::BY_KEY
+ case @context.order
+ when :ascending
+ flags |= TableCursorFlags::ASCENDING
+ when :descending
+ flags |= TableCursorFlags::DESCENDING
+ end
+ case min_border
+ when :include
+ flags |= TableCursorFlags::GE
+ when :exclude
+ flags |= TableCursorFlags::GT
+ end
+ case max_border
+ when :include
+ flags |= TableCursorFlags::LE
+ when :exclude
+ flags |= TableCursorFlags::LT
+ end
+ flags
+ end
+
+ def compute_max_n_unmatched_records(data_table_size, limit)
+ max_n_unmatched_records = limit * 100
+ max_n_sample_records = data_table_size
+ if max_n_sample_records > 10000
+ sample_ratio = 1 / (Math.log(data_table_size) ** 2)
+ max_n_sample_records = (max_n_sample_records * sample_ratio).ceil
+ end
+ if max_n_unmatched_records > max_n_sample_records
+ max_n_unmatched_records = max_n_sample_records
+ end
+ max_n_unmatched_records
+ end
+
+ def filter_table
+ table = @shard.table
+ create_expression(table) do |expression|
+ yield(expression)
+ result_set = table.select(expression)
+ sort_result_set(result_set)
+ end
+ end
+
+ def sort_result_set(result_set)
+ if result_set.empty?
+ result_set.close if result_set.temporary?
+ return
+ end
+
+ if result_set.size <= @context.current_offset
+ @context.current_offset -= result_set.size
+ result_set.close if result_set.temporary?
+ return
+ end
+
+ @unsorted_result_sets << result_set if result_set.temporary?
+ sort_keys = [
+ {
+ :key => @context.enumerator.shard_key_name,
+ :order => @context.order,
+ },
+ ]
+ if @context.current_limit > 0
+ limit = @context.current_limit
+ else
+ limit = result_set.size
+ end
+ sorted_result_set = result_set.sort(sort_keys,
+ :offset => @context.current_offset,
+ :limit => limit)
+ @result_sets << sorted_result_set
+ if @context.current_offset > 0
+ @context.current_offset = 0
+ end
+ if @context.current_limit > 0
+ @context.current_limit -= sorted_result_set.size
+ end
+ end
+ end
+ end
+ end
+end
diff --git a/storage/mroonga/vendor/groonga/plugins/sharding/logical_select.rb b/storage/mroonga/vendor/groonga/plugins/sharding/logical_select.rb
new file mode 100644
index 00000000..07ebf9e8
--- /dev/null
+++ b/storage/mroonga/vendor/groonga/plugins/sharding/logical_select.rb
@@ -0,0 +1,975 @@
+module Groonga
+ module Sharding
+ class LogicalSelectCommand < Command
+ register("logical_select",
+ [
+ "logical_table",
+ "shard_key",
+ "min",
+ "min_border",
+ "max",
+ "max_border",
+ "filter",
+ # Deprecated since 6.1.5. Use sort_keys instead.
+ "sortby",
+ "output_columns",
+ "offset",
+ "limit",
+ "drilldown",
+ # Deprecated since 6.1.5. Use drilldown_sort_keys instead.
+ "drilldown_sortby",
+ "drilldown_output_columns",
+ "drilldown_offset",
+ "drilldown_limit",
+ "drilldown_calc_types",
+ "drilldown_calc_target",
+ "sort_keys",
+ "drilldown_sort_keys",
+ "match_columns",
+ "query",
+ "drilldown_filter",
+ ])
+
+ def run_body(input)
+ context = ExecuteContext.new(input)
+ begin
+ executor = Executor.new(context)
+ executor.execute
+
+ n_results = 1
+ n_plain_drilldowns = context.plain_drilldown.n_result_sets
+ n_labeled_drilldowns = context.labeled_drilldowns.n_result_sets
+ if n_plain_drilldowns > 0
+ n_results += n_plain_drilldowns
+ elsif
+ if n_labeled_drilldowns > 0
+ n_results += 1
+ end
+ end
+
+ writer.array("RESULT", n_results) do
+ write_records(writer, context)
+ if n_plain_drilldowns > 0
+ write_plain_drilldowns(writer, context)
+ elsif n_labeled_drilldowns > 0
+ write_labeled_drilldowns(writer, context)
+ end
+ end
+ ensure
+ context.close
+ end
+ end
+
+ private
+ def cache_key(input)
+ sort_keys = input[:sort_keys] || input[:sortby]
+ drilldown_sort_keys =
+ input[:drilldown_sort_keys] || input[:drilldown_sortby]
+ key = "logical_select\0"
+ key << "#{input[:logical_table]}\0"
+ key << "#{input[:shard_key]}\0"
+ key << "#{input[:min]}\0"
+ key << "#{input[:min_border]}\0"
+ key << "#{input[:max]}\0"
+ key << "#{input[:max_border]}\0"
+ key << "#{input[:filter]}\0"
+ key << "#{sort_keys}\0"
+ key << "#{input[:output_columns]}\0"
+ key << "#{input[:offset]}\0"
+ key << "#{input[:limit]}\0"
+ key << "#{input[:drilldown]}\0"
+ key << "#{drilldown_sort_keys}\0"
+ key << "#{input[:match_columns]}\0"
+ key << "#{input[:query]}\0"
+ key << "#{input[:drilldown_output_columns]}\0"
+ key << "#{input[:drilldown_offset]}\0"
+ key << "#{input[:drilldown_limit]}\0"
+ key << "#{input[:drilldown_calc_types]}\0"
+ key << "#{input[:drilldown_calc_target]}\0"
+ key << "#{input[:drilldown_filter]}\0"
+ labeled_drilldowns = LabeledDrilldowns.parse(input).sort_by(&:label)
+ labeled_drilldowns.each do |drilldown|
+ key << "#{drilldown.label}\0"
+ key << "#{drilldown.keys.join(',')}\0"
+ key << "#{drilldown.output_columns}\0"
+ key << "#{drilldown.offset}\0"
+ key << "#{drilldown.limit}\0"
+ key << "#{drilldown.calc_types}\0"
+ key << "#{drilldown.calc_target_name}\0"
+ key << "#{drilldown.filter}\0"
+ cache_key_dynamic_columns(key, drilldown.dynamic_columns)
+ end
+ dynamic_columns = DynamicColumns.parse(input)
+ cache_key_dynamic_columns(key, dynamic_columns)
+ key
+ end
+
+ def cache_key_dynamic_columns(key, dynamic_columns)
+ [
+ :initial,
+ :filtered,
+ :output
+ ].each do |stage|
+ target_dynamic_columns = dynamic_columns.__send__("each_#{stage}").to_a
+ target_dynamic_columns.sort_by(&:label).each do |dynamic_column|
+ key << "#{dynamic_column.label}\0"
+ key << "#{dynamic_column.stage}\0"
+ key << "#{dynamic_column.type}\0"
+ key << "#{dynamic_column.flags}\0"
+ key << "#{dynamic_column.value}\0"
+ key << "#{dynamic_column.window_sort_keys.join(',')}\0"
+ key << "#{dynamic_column.window_group_keys.join(',')}\0"
+ end
+ end
+ end
+
+ def write_records(writer, context)
+ result_sets = context.result_sets
+
+ n_hits = 0
+ n_elements = 2 # for N hits and columns
+ result_sets.each do |result_set|
+ n_hits += result_set.size
+ n_elements += result_set.size
+ end
+
+ output_columns = context.output_columns
+
+ writer.array("RESULTSET", n_elements) do
+ writer.array("NHITS", 1) do
+ writer.write(n_hits)
+ end
+ first_result_set = result_sets.first
+ if first_result_set
+ writer.write_table_columns(first_result_set, output_columns)
+ end
+
+ current_offset = context.offset
+ current_offset += n_hits if current_offset < 0
+ current_limit = context.limit
+ current_limit += n_hits + 1 if current_limit < 0
+ options = {
+ :offset => current_offset,
+ :limit => current_limit,
+ }
+ result_sets.each do |result_set|
+ if result_set.size > current_offset
+ writer.write_table_records(result_set, output_columns, options)
+ current_limit -= result_set.size
+ end
+ if current_offset > 0
+ current_offset = [current_offset - result_set.size, 0].max
+ end
+ break if current_limit <= 0
+ options[:offset] = current_offset
+ options[:limit] = current_limit
+ end
+ end
+ end
+
+ def write_plain_drilldowns(writer, execute_context)
+ plain_drilldown = execute_context.plain_drilldown
+
+ drilldowns = plain_drilldown.result_sets
+ output_columns = plain_drilldown.output_columns
+ options = {
+ :offset => plain_drilldown.offset,
+ :limit => plain_drilldown.limit,
+ }
+
+ drilldowns.each do |drilldown|
+ n_elements = 2 # for N hits and columns
+ n_elements += drilldown.size
+ writer.array("RESULTSET", n_elements) do
+ writer.array("NHITS", 1) do
+ writer.write(drilldown.size)
+ end
+ writer.write_table_columns(drilldown, output_columns)
+ writer.write_table_records(drilldown, output_columns,
+ options)
+ end
+ end
+ end
+
+ def write_labeled_drilldowns(writer, execute_context)
+ labeled_drilldowns = execute_context.labeled_drilldowns
+ is_command_version1 = (context.command_version == 1)
+
+ writer.map("DRILLDOWNS", labeled_drilldowns.n_result_sets) do
+ labeled_drilldowns.each do |drilldown|
+ writer.write(drilldown.label)
+
+ result_set = drilldown.result_set
+ n_elements = 2 # for N hits and columns
+ n_elements += result_set.size
+ output_columns = drilldown.output_columns
+ options = {
+ :offset => drilldown.offset,
+ :limit => drilldown.limit,
+ }
+
+ writer.array("RESULTSET", n_elements) do
+ writer.array("NHITS", 1) do
+ writer.write(result_set.size)
+ end
+ writer.write_table_columns(result_set, output_columns)
+ if is_command_version1 and drilldown.need_command_version2?
+ context.with_command_version(2) do
+ writer.write_table_records(result_set,
+ drilldown.output_columns_v2,
+ options)
+ end
+ else
+ writer.write_table_records(result_set, output_columns, options)
+ end
+ end
+ end
+ end
+ end
+
+ class LabeledArgumentParser
+ def initialize(parameters)
+ @parameters = parameters
+ end
+
+ def parse(prefix_pattern)
+ pattern = /\A#{prefix_pattern}\[(.+?)\]\.(.+)\z/
+ labeled_arguments = {}
+ @parameters.each do |key, value|
+ match_data = pattern.match(key)
+ next if match_data.nil?
+ labeled_argument = (labeled_arguments[match_data[1]] ||= {})
+ labeled_argument[match_data[2]] = value
+ end
+ labeled_arguments
+ end
+ end
+
+ module KeysParsable
+ private
+ def parse_keys(raw_keys)
+ return [] if raw_keys.nil?
+
+ raw_keys.strip.split(/ *, */)
+ end
+ end
+
+ module Calculatable
+ def calc_target(table)
+ return nil if @calc_target_name.nil?
+ table.find_column(@calc_target_name)
+ end
+
+ private
+ def parse_calc_types(raw_types)
+ return TableGroupFlags::CALC_COUNT if raw_types.nil?
+
+ types = 0
+ raw_types.strip.split(/ *, */).each do |name|
+ case name
+ when "COUNT"
+ types |= TableGroupFlags::CALC_COUNT
+ when "MAX"
+ types |= TableGroupFlags::CALC_MAX
+ when "MIN"
+ types |= TableGroupFlags::CALC_MIN
+ when "SUM"
+ types |= TableGroupFlags::CALC_SUM
+ when "AVG"
+ types |= TableGroupFlags::CALC_AVG
+ when "NONE"
+ # Do nothing
+ else
+ raise InvalidArgument, "invalid drilldown calc type: <#{name}>"
+ end
+ end
+ types
+ end
+ end
+
+ class ExecuteContext
+ include KeysParsable
+
+ attr_reader :enumerator
+ attr_reader :match_columns
+ attr_reader :query
+ attr_reader :filter
+ attr_reader :offset
+ attr_reader :limit
+ attr_reader :sort_keys
+ attr_reader :output_columns
+ attr_reader :dynamic_columns
+ attr_reader :result_sets
+ attr_reader :unsorted_result_sets
+ attr_reader :plain_drilldown
+ attr_reader :labeled_drilldowns
+ attr_reader :temporary_tables
+ attr_reader :expressions
+ def initialize(input)
+ @input = input
+ @enumerator = LogicalEnumerator.new("logical_select", @input)
+ @match_columns = @input[:match_columns]
+ @query = @input[:query]
+ @filter = @input[:filter]
+ @offset = (@input[:offset] || 0).to_i
+ @limit = (@input[:limit] || 10).to_i
+ @sort_keys = parse_keys(@input[:sort_keys] || @input[:sortby])
+ @output_columns = @input[:output_columns] || "_id, _key, *"
+
+ @dynamic_columns = DynamicColumns.parse(@input)
+
+ @result_sets = []
+ @unsorted_result_sets = []
+
+ @plain_drilldown = PlainDrilldownExecuteContext.new(@input)
+ @labeled_drilldowns = LabeledDrilldowns.parse(@input)
+
+ @temporary_tables = []
+
+ @expressions = []
+ end
+
+ def close
+ @result_sets.each do |result_set|
+ result_set.close if result_set.temporary?
+ end
+ @unsorted_result_sets.each do |result_set|
+ result_set.close if result_set.temporary?
+ end
+
+ @plain_drilldown.close
+ @labeled_drilldowns.close
+
+ @dynamic_columns.close
+
+ @temporary_tables.each do |table|
+ table.close
+ end
+
+ @expressions.each do |expression|
+ expression.close
+ end
+ end
+ end
+
+ class DynamicColumns
+ class << self
+ def parse(input)
+ parser = LabeledArgumentParser.new(input)
+ columns = parser.parse(/columns?/)
+
+ initial_contexts = []
+ filtered_contexts = []
+ output_contexts = []
+ columns.each do |label, parameters|
+ contexts = nil
+ case parameters["stage"]
+ when "initial"
+ contexts = initial_contexts
+ when "filtered"
+ contexts = filtered_contexts
+ when "output"
+ contexts = output_contexts
+ else
+ next
+ end
+ contexts << DynamicColumnExecuteContext.new(label, parameters)
+ end
+
+ new(initial_contexts,
+ filtered_contexts,
+ output_contexts)
+ end
+ end
+
+ def initialize(initial_contexts,
+ filtered_contexts,
+ output_contexts)
+ @initial_contexts = initial_contexts
+ @filtered_contexts = filtered_contexts
+ @output_contexts = output_contexts
+ end
+
+ def each_initial(&block)
+ @initial_contexts.each(&block)
+ end
+
+ def each_filtered(&block)
+ @filtered_contexts.each(&block)
+ end
+
+ def each_output(&block)
+ @output_contexts.each(&block)
+ end
+
+ def close
+ @initial_contexts.each do |context|
+ context.close
+ end
+ @filtered_contexts.each do |context|
+ context.close
+ end
+ @output_contexts.each do |context|
+ context.close
+ end
+ end
+ end
+
+ class DynamicColumnExecuteContext
+ include KeysParsable
+
+ attr_reader :label
+ attr_reader :stage
+ attr_reader :type
+ attr_reader :flags
+ attr_reader :value
+ attr_reader :window_sort_keys
+ attr_reader :window_group_keys
+ def initialize(label, parameters)
+ @label = label
+ @stage = parameters["stage"]
+ @type = parse_type(parameters["type"])
+ @flags = parse_flags(parameters["flags"] || "COLUMN_SCALAR")
+ @value = parameters["value"]
+ @window_sort_keys = parse_keys(parameters["window.sort_keys"])
+ @window_group_keys = parse_keys(parameters["window.group_keys"])
+ end
+
+ def close
+ end
+
+ def apply(table, condition=nil)
+ column = table.create_column(@label, @flags, @type)
+ return if table.empty?
+
+ expression = Expression.create(table)
+ begin
+ expression.parse(@value)
+ if @window_sort_keys.empty? and @window_group_keys.empty?
+ expression.condition = condition if condition
+ table.apply_expression(column, expression)
+ else
+ table.apply_window_function(column, expression,
+ :sort_keys => @window_sort_keys,
+ :group_keys => @window_group_keys)
+ end
+ ensure
+ expression.close
+ end
+ end
+
+ private
+ def parse_type(type_raw)
+ return nil if type_raw.nil?
+
+ type = Context.instance[type_raw]
+ if type.nil?
+ message = "#{error_message_tag} unknown type: <#{type_raw}>"
+ raise InvalidArgument, message
+ end
+
+ case type
+ when Type, Table
+ type
+ else
+ message = "#{error_message_tag} invalid type: #{type.grn_inspect}"
+ raise InvalidArgument, message
+ end
+ end
+
+ def parse_flags(flags_raw)
+ Column.parse_flags(error_message_tag, flags_raw)
+ end
+
+ def error_message_tag
+ "[logical_select][columns][#{@stage}][#{@label}]"
+ end
+ end
+
+ class PlainDrilldownExecuteContext
+ include KeysParsable
+ include Calculatable
+
+ attr_reader :keys
+ attr_reader :offset
+ attr_reader :limit
+ attr_reader :sort_keys
+ attr_reader :output_columns
+ attr_reader :calc_target_name
+ attr_reader :calc_types
+ attr_reader :filter
+ attr_reader :result_sets
+ attr_reader :unsorted_result_sets
+ attr_reader :temporary_tables
+ attr_reader :expressions
+ def initialize(input)
+ @input = input
+ @keys = parse_keys(@input[:drilldown])
+ @offset = (@input[:drilldown_offset] || 0).to_i
+ @limit = (@input[:drilldown_limit] || 10).to_i
+ @sort_keys = parse_keys(@input[:drilldown_sort_keys] ||
+ @input[:drilldown_sortby])
+ @output_columns = @input[:drilldown_output_columns]
+ @output_columns ||= "_key, _nsubrecs"
+ @calc_target_name = @input[:drilldown_calc_target]
+ @calc_types = parse_calc_types(@input[:drilldown_calc_types])
+ @filter = @input[:drilldown_filter]
+
+ @result_sets = []
+ @unsorted_result_sets = []
+
+ @temporary_tables = []
+
+ @expressions = []
+ end
+
+ def close
+ @result_sets.each do |result_set|
+ result_set.close
+ end
+ @unsorted_result_sets.each do |result_set|
+ result_set.close
+ end
+
+ @temporary_tables.each do |table|
+ table.close
+ end
+
+ @expressions.each do |expression|
+ expression.close
+ end
+ end
+
+ def have_keys?
+ @keys.size > 0
+ end
+
+ def n_result_sets
+ @result_sets.size
+ end
+ end
+
+ class LabeledDrilldowns
+ include Enumerable
+ include TSort
+
+ class << self
+ def parse(input)
+ parser = LabeledArgumentParser.new(input)
+ drilldowns = parser.parse(/drilldowns?/)
+
+ contexts = {}
+ drilldowns.each do |label, parameters|
+ next if parameters["keys"].nil?
+ context = LabeledDrilldownExecuteContext.new(label, parameters)
+ contexts[label] = context
+ end
+
+ new(contexts)
+ end
+ end
+
+ def initialize(contexts)
+ @contexts = contexts
+ @dependencies = {}
+ @contexts.each do |label, context|
+ if context.table
+ depended_context = @contexts[context.table]
+ if depended_context.nil?
+ raise "Unknown drilldown: <#{context.table}>"
+ end
+ @dependencies[label] = [depended_context]
+ else
+ @dependencies[label] = []
+ end
+ end
+ end
+
+ def close
+ @contexts.each_value do |context|
+ context.close
+ end
+ end
+
+ def [](label)
+ @contexts[label]
+ end
+
+ def have_keys?
+ not @contexts.empty?
+ end
+
+ def n_result_sets
+ @contexts.size
+ end
+
+ def each(&block)
+ @contexts.each_value(&block)
+ end
+
+ def tsort_each_node(&block)
+ @contexts.each_value(&block)
+ end
+
+ def tsort_each_child(context, &block)
+ @dependencies[context.label].each(&block)
+ end
+ end
+
+ class LabeledDrilldownExecuteContext
+ include KeysParsable
+ include Calculatable
+
+ attr_reader :label
+ attr_reader :keys
+ attr_reader :offset
+ attr_reader :limit
+ attr_reader :sort_keys
+ attr_reader :output_columns
+ attr_reader :calc_target_name
+ attr_reader :calc_types
+ attr_reader :filter
+ attr_reader :table
+ attr_reader :dynamic_columns
+ attr_accessor :result_set
+ attr_accessor :unsorted_result_set
+ attr_reader :temporary_tables
+ attr_reader :expressions
+ def initialize(label, parameters)
+ @label = label
+ @keys = parse_keys(parameters["keys"])
+ @offset = (parameters["offset"] || 0).to_i
+ @limit = (parameters["limit"] || 10).to_i
+ @sort_keys = parse_keys(parameters["sort_keys"] ||
+ parameters["sortby"])
+ @output_columns = parameters["output_columns"]
+ @output_columns ||= "_key, _nsubrecs"
+ @calc_target_name = parameters["calc_target"]
+ @calc_types = parse_calc_types(parameters["calc_types"])
+ @filter = parameters["filter"]
+ @table = parameters["table"]
+
+ @dynamic_columns = DynamicColumns.parse(parameters)
+
+ @result_set = nil
+ @unsorted_result_set = nil
+
+ @temporary_tables = []
+
+ @expressions = []
+ end
+
+ def close
+ @result_set.close if @result_set
+ @unsorted_result_set.close if @unsorted_result_set
+
+ @dynamic_columns.close
+
+ @temporary_tables.each do |table|
+ table.close
+ end
+
+ @expressions.each do |expression|
+ expression.close
+ end
+ end
+
+ def need_command_version2?
+ /[.\[]/ === @output_columns
+ end
+
+ def output_columns_v2
+ columns = @output_columns.strip.split(/ *, */)
+ converted_columns = columns.collect do |column|
+ match_data = /\A_value\.(.+)\z/.match(column)
+ if match_data.nil?
+ column
+ else
+ nth_key = keys.index(match_data[1])
+ if nth_key
+ "_key[#{nth_key}]"
+ else
+ column
+ end
+ end
+ end
+ converted_columns.join(",")
+ end
+ end
+
+ class Executor
+ def initialize(context)
+ @context = context
+ end
+
+ def execute
+ execute_search
+ if @context.plain_drilldown.have_keys?
+ execute_plain_drilldown
+ elsif @context.labeled_drilldowns.have_keys?
+ execute_labeled_drilldowns
+ end
+ end
+
+ private
+ def execute_search
+ first_shard = nil
+ enumerator = @context.enumerator
+ enumerator.each do |shard, shard_range|
+ first_shard ||= shard
+ shard_executor = ShardExecutor.new(@context, shard, shard_range)
+ shard_executor.execute
+ end
+ if first_shard.nil?
+ message =
+ "[logical_select] no shard exists: " +
+ "logical_table: <#{enumerator.logical_table}>: " +
+ "shard_key: <#{enumerator.shard_key_name}>"
+ raise InvalidArgument, message
+ end
+ if @context.result_sets.empty?
+ result_set = HashTable.create(:flags => ObjectFlags::WITH_SUBREC,
+ :key_type => first_shard.table)
+ @context.dynamic_columns.each_initial do |dynamic_column|
+ dynamic_column.apply(result_set)
+ end
+ @context.dynamic_columns.each_filtered do |dynamic_column|
+ dynamic_column.apply(result_set)
+ end
+ @context.result_sets << result_set
+ end
+ end
+
+ def execute_plain_drilldown
+ drilldown = @context.plain_drilldown
+ group_result = TableGroupResult.new
+ begin
+ group_result.key_begin = 0
+ group_result.key_end = 0
+ group_result.limit = 1
+ group_result.flags = drilldown.calc_types
+ drilldown.keys.each do |key|
+ @context.result_sets.each do |result_set|
+ with_calc_target(group_result,
+ drilldown.calc_target(result_set)) do
+ result_set.group([key], group_result)
+ end
+ end
+ result_set = group_result.table
+ result_set = apply_drilldown_filter(drilldown, result_set)
+ if drilldown.sort_keys.empty?
+ drilldown.result_sets << result_set
+ else
+ drilldown.result_sets << result_set.sort(drilldown.sort_keys)
+ drilldown.unsorted_result_sets << result_set
+ end
+ group_result.table = nil
+ end
+ ensure
+ group_result.close
+ end
+ end
+
+ def execute_labeled_drilldowns
+ drilldowns = @context.labeled_drilldowns
+
+ drilldowns.tsort_each do |drilldown|
+ group_result = TableGroupResult.new
+ keys = drilldown.keys
+ begin
+ group_result.key_begin = 0
+ group_result.key_end = keys.size - 1
+ if keys.size > 1
+ group_result.max_n_sub_records = 1
+ end
+ group_result.limit = 1
+ group_result.flags = drilldown.calc_types
+ if drilldown.table
+ target_table = drilldowns[drilldown.table].result_set
+ with_calc_target(group_result,
+ drilldown.calc_target(target_table)) do
+ target_table.group(keys, group_result)
+ end
+ else
+ @context.result_sets.each do |result_set|
+ with_calc_target(group_result,
+ drilldown.calc_target(result_set)) do
+ result_set.group(keys, group_result)
+ end
+ end
+ end
+ result_set = group_result.table
+ drilldown.dynamic_columns.each_initial do |dynamic_column|
+ dynamic_column.apply(result_set)
+ end
+ result_set = apply_drilldown_filter(drilldown, result_set)
+ if drilldown.sort_keys.empty?
+ drilldown.result_set = result_set
+ else
+ drilldown.result_set = result_set.sort(drilldown.sort_keys)
+ drilldown.unsorted_result_set = result_set
+ end
+ group_result.table = nil
+ ensure
+ group_result.close
+ end
+ end
+ end
+
+ def with_calc_target(group_result, calc_target)
+ group_result.calc_target = calc_target
+ begin
+ yield
+ ensure
+ calc_target.close if calc_target
+ group_result.calc_target = nil
+ end
+ end
+
+ def apply_drilldown_filter(drilldown, result_set)
+ filter = drilldown.filter
+ return result_set if filter.nil?
+
+ expression = Expression.create(result_set)
+ drilldown.expressions << expression
+ expression.parse(filter)
+ filtered_result_set = result_set.select(expression)
+ drilldown.temporary_tables << result_set
+ filtered_result_set
+ end
+ end
+
+ class ShardExecutor
+ def initialize(context, shard, shard_range)
+ @context = context
+ @shard = shard
+ @shard_range = shard_range
+
+ @target_table = @shard.table
+
+ @match_columns = @context.match_columns
+ @query = @context.query
+ @filter = @context.filter
+ @sort_keys = @context.sort_keys
+ @result_sets = @context.result_sets
+ @unsorted_result_sets = @context.unsorted_result_sets
+
+ @target_range = @context.enumerator.target_range
+
+ @cover_type = @target_range.cover_type(@shard_range)
+ end
+
+ def execute
+ return if @cover_type == :none
+ return if @target_table.empty?
+
+ shard_key = @shard.key
+ if shard_key.nil?
+ message = "[logical_select] shard_key doesn't exist: " +
+ "<#{@shard.key_name}>"
+ raise InvalidArgument, message
+ end
+
+ @context.dynamic_columns.each_initial do |dynamic_column|
+ if @target_table == @shard.table
+ @target_table = create_all_match_table(@target_table)
+ @context.temporary_tables << @target_table
+ end
+ dynamic_column.apply(@target_table)
+ end
+
+ create_expression_builder(shard_key) do |expression_builder|
+ case @cover_type
+ when :all
+ filter_shard_all(expression_builder)
+ when :partial_min
+ filter_table do |expression|
+ expression_builder.build_partial_min(expression)
+ end
+ when :partial_max
+ filter_table do |expression|
+ expression_builder.build_partial_max(expression)
+ end
+ when :partial_min_and_max
+ filter_table do |expression|
+ expression_builder.build_partial_min_and_max(expression)
+ end
+ end
+ end
+ end
+
+ private
+ def filter_shard_all(expression_builder)
+ if @query.nil? and @filter.nil?
+ add_result_set(@target_table, nil)
+ @context.temporary_tables.delete(@target_table)
+ else
+ filter_table do |expression|
+ expression_builder.build_all(expression)
+ end
+ end
+ end
+
+ def create_expression(table)
+ expression = Expression.create(table)
+ @context.expressions << expression
+ expression
+ end
+
+ def create_expression_builder(shard_key)
+ expression_builder = RangeExpressionBuilder.new(shard_key,
+ @target_range)
+ expression_builder.match_columns = @match_columns
+ expression_builder.query = @query
+ expression_builder.filter = @filter
+ begin
+ yield(expression_builder)
+ ensure
+ expression = expression_builder.match_columns_expression
+ @context.expressions << expression if expression
+ end
+ end
+
+ def filter_table
+ table = @target_table
+ expression = create_expression(table)
+ yield(expression)
+ add_result_set(table.select(expression), expression)
+ end
+
+ def add_result_set(result_set, condition)
+ if result_set.empty?
+ result_set.close
+ return
+ end
+
+ @context.dynamic_columns.each_filtered do |dynamic_column|
+ if result_set == @shard.table
+ @context.temporary_tables << result_set
+ result_set = create_all_match_table(result_set)
+ end
+ dynamic_column.apply(result_set, condition)
+ end
+
+ if @sort_keys.empty?
+ @result_sets << result_set
+ else
+ @unsorted_result_sets << result_set
+ sorted_result_set = result_set.sort(@sort_keys)
+ @result_sets << sorted_result_set
+ end
+ end
+
+ def create_all_match_table(table)
+ expression = Expression.create(table)
+ begin
+ expression.append_constant(true, Operator::PUSH, 1)
+ table.select(expression)
+ ensure
+ expression.close
+ end
+ end
+ end
+ end
+ end
+end
diff --git a/storage/mroonga/vendor/groonga/plugins/sharding/logical_shard_list.rb b/storage/mroonga/vendor/groonga/plugins/sharding/logical_shard_list.rb
new file mode 100644
index 00000000..b8ef3f76
--- /dev/null
+++ b/storage/mroonga/vendor/groonga/plugins/sharding/logical_shard_list.rb
@@ -0,0 +1,28 @@
+module Groonga
+ module Sharding
+ class LogicalShardListCommand < Command
+ register("logical_shard_list",
+ [
+ "logical_table",
+ ])
+
+ def run_body(input)
+ enumerator = LogicalEnumerator.new("logical_shard_list",
+ input,
+ :require_shard_key => false)
+ shard_names = enumerator.collect do |current_shard, shard_range|
+ current_shard.table_name
+ end
+
+ writer.array("shards", shard_names.size) do
+ shard_names.each do |shard_name|
+ writer.map("shard", 1) do
+ writer.write("name")
+ writer.write(shard_name)
+ end
+ end
+ end
+ end
+ end
+ end
+end
diff --git a/storage/mroonga/vendor/groonga/plugins/sharding/logical_table_remove.rb b/storage/mroonga/vendor/groonga/plugins/sharding/logical_table_remove.rb
new file mode 100644
index 00000000..3353d6c3
--- /dev/null
+++ b/storage/mroonga/vendor/groonga/plugins/sharding/logical_table_remove.rb
@@ -0,0 +1,345 @@
+module Groonga
+ module Sharding
+ class LogicalTableRemoveCommand < Command
+ register("logical_table_remove",
+ [
+ "logical_table",
+ "shard_key",
+ "min",
+ "min_border",
+ "max",
+ "max_border",
+ "dependent",
+ "force",
+ ])
+
+ def run_body(input)
+ @dependent = (input[:dependent] == "yes")
+ @force = (input[:force] == "yes")
+
+ enumerator = LogicalEnumerator.new("logical_table_remove", input)
+
+ success = true
+ enumerator.each do |shard, shard_range|
+ remove_shard(shard, shard_range, enumerator.target_range)
+ end
+ writer.write(success)
+ end
+
+ private
+ def remove_shard(shard, shard_range, target_range)
+ cover_type = target_range.cover_type(shard_range)
+ return if cover_type == :none
+
+ shard_key = shard.key
+ if shard_key.nil?
+ if @force
+ context.clear_error
+ else
+ message =
+ "[logical_table_remove] shard_key doesn't exist: " +
+ "<#{shard.key_name}>"
+ raise InvalidArgument, message
+ end
+ end
+ table = shard.table
+
+ if cover_type == :all or ((table.nil? or shard_key.nil?) and @force)
+ remove_table(shard, table)
+ return
+ end
+
+ expression_builder = RangeExpressionBuilder.new(shard_key,
+ target_range)
+ case cover_type
+ when :partial_min
+ remove_records(table) do |expression|
+ expression_builder.build_partial_min(expression)
+ end
+ remove_table(shard, table) if table.empty?
+ when :partial_max
+ remove_records(table) do |expression|
+ expression_builder.build_partial_max(expression)
+ end
+ remove_table(shard, table) if table.empty?
+ when :partial_min_and_max
+ remove_records(table) do |expression|
+ expression_builder.build_partial_min_and_max(expression)
+ end
+ remove_table(shard, table) if table.empty?
+ end
+ end
+
+ def collect_referenced_table_ids_from_index_ids(index_ids,
+ referenced_table_ids)
+ database = context.database
+ index_ids.each do |index_id|
+ index = context[index_id]
+ if index.nil?
+ context.clear_error
+ index_name = database[index_id]
+ lexicon_name = index_name.split(".", 2)[0]
+ lexicon_id = database[lexicon_name]
+ referenced_table_ids << lexicon_id if lexicon_id
+ else
+ referenced_table_ids << index.domain_id
+ end
+ end
+ end
+
+ def collect_referenced_table_ids_from_column_name(column_name,
+ referenced_table_ids)
+ database = context.database
+ column_id = database[column_name]
+ database.each_raw do |id, cursor|
+ next if ID.builtin?(id)
+ next if id == column_id
+
+ context.open_temporary(id) do |object|
+ if object.nil?
+ context.clear_error
+ next
+ end
+
+ case object
+ when IndexColumn
+ if object.source_ids.include?(column_id)
+ collect_referenced_table_ids_from_index_ids([id],
+ referenced_table_ids)
+ end
+ end
+ end
+ end
+ end
+
+ def collect_referenced_table_ids_from_column(column,
+ referenced_table_ids)
+ range = column.range
+ case range
+ when nil
+ context.clear_error
+ when Table
+ referenced_table_ids << range.id
+ collect_referenced_table_ids_from_index_ids(range.index_ids,
+ referenced_table_ids)
+ end
+ collect_referenced_table_ids_from_index_ids(column.index_ids,
+ referenced_table_ids)
+ end
+
+ def collect_referenced_table_ids_from_column_names(column_names)
+ referenced_table_ids = []
+ column_names.each do |column_name|
+ column = context[column_name]
+ if column.nil?
+ context.clear_error
+ collect_referenced_table_ids_from_column_name(column_name,
+ referenced_table_ids)
+ else
+ collect_referenced_table_ids_from_column(column,
+ referenced_table_ids)
+ end
+ end
+ referenced_table_ids
+ end
+
+ def collect_referenced_table_ids(shard, table)
+ return [] unless @dependent
+
+ column_names = nil
+ if table
+ begin
+ column_names = table.columns.collect(&:name)
+ rescue
+ context.clear_error
+ end
+ end
+ if column_names.nil?
+ prefix = "#{shard.table_name}."
+ column_names = []
+ context.database.each_name(:prefix => prefix) do |column_name|
+ column_names << column_name
+ end
+ end
+
+ collect_referenced_table_ids_from_column_names(column_names)
+ end
+
+ def remove_table(shard, table)
+ if table.nil?
+ unless @force
+ if context.rc == Context::RC::SUCCESS.to_i
+ error_class = InvalidArgument
+ else
+ rc = Context::RC.find(context.rc)
+ error_class = rc.error_class
+ end
+ message = "[logical_table_remove] table is broken: " +
+ "<#{shard.table_name}>: #{context.error_message}"
+ raise error_class, message
+ end
+ context.clear_error
+ end
+
+ referenced_table_ids = collect_referenced_table_ids(shard, table)
+
+ if table.nil?
+ remove_table_force(shard.table_name)
+ else
+ options = {:dependent => @dependent}
+ if @force
+ begin
+ table.remove(options)
+ rescue
+ context.clear_error
+ table.close
+ remove_table_force(shard.table_name)
+ end
+ else
+ table.remove(options)
+ end
+ end
+
+ remove_referenced_tables(shard, referenced_table_ids)
+ end
+
+ def remove_table_force(table_name)
+ database = context.database
+
+ prefix = "#{table_name}."
+ database.each_raw(:prefix => prefix) do |id, cursor|
+ column = context[id]
+ if column.nil?
+ context.clear_error
+ column_name = cursor.key
+ remove_column_force(column_name)
+ table = context[table_name]
+ if table.nil?
+ context.clear_error
+ else
+ table.close
+ end
+ else
+ remove_column(column)
+ end
+ end
+
+ table_id = database[table_name]
+ return if table_id.nil?
+
+ database.each_raw do |id, cursor|
+ next if ID.builtin?(id)
+ next if id == table_id
+
+ context.open_temporary(id) do |object|
+ if object.nil?
+ context.clear_error
+ next
+ end
+
+ case object
+ when Table
+ if object.domain_id == table_id
+ begin
+ object.remove(:dependent => @dependent)
+ rescue
+ context.clear_error
+ reference_table_name = object.name
+ object.close
+ remove_table_force(reference_table_name)
+ end
+ end
+ when Column
+ if object.range_id == table_id
+ remove_column(object)
+ end
+ end
+ end
+ end
+
+ Object.remove_force(table_name)
+ end
+
+ def remove_column(column)
+ begin
+ column.remove(:dependent => @dependent)
+ rescue
+ context.clear_error
+ column_name = column.name
+ column.close
+ remove_column_force(column_name)
+ end
+ end
+
+ def remove_column_force(column_name)
+ database = context.database
+
+ column_id = database[column_name]
+
+ column = context[column_id]
+ if column.nil?
+ context.clear_error
+ else
+ column.index_ids.each do |id|
+ index_column = context[id]
+ if index_column.nil?
+ context.clear_error
+ index_column_name = database[id]
+ remove_column_force(index_column_name)
+ else
+ remove_column(index_column)
+ end
+ end
+ column.close
+ end
+
+ Object.remove_force(column_name)
+ end
+
+ def remove_referenced_tables(shard, referenced_table_ids)
+ return if referenced_table_ids.empty?
+
+ database = context.database
+ shard_suffix = shard.range_data.to_suffix
+ referenced_table_ids.uniq.each do |referenced_table_id|
+ referenced_table_name = database[referenced_table_id]
+ next if referenced_table_name.nil?
+ next unless referenced_table_name.end_with?(shard_suffix)
+
+ referenced_table = context[referenced_table_id]
+ if referenced_table.nil?
+ context.clear_error
+ if @force
+ Object.remove_force(referenced_table_name)
+ end
+ next
+ end
+
+ if @force
+ begin
+ referenced_table.remove(:dependent => @dependent)
+ rescue
+ context.clear_error
+ referenced_table.close
+ remove_table_force(referenced_table_name)
+ end
+ else
+ referenced_table.remove(:dependent => @dependent)
+ end
+ end
+ end
+
+ def remove_records(table)
+ expression = nil
+
+ begin
+ expression = Expression.create(table)
+ yield(expression)
+ table.delete(:expression => expression)
+ ensure
+ expression.close if expression
+ end
+ end
+ end
+ end
+end
diff --git a/storage/mroonga/vendor/groonga/plugins/sharding/parameters.rb b/storage/mroonga/vendor/groonga/plugins/sharding/parameters.rb
new file mode 100644
index 00000000..b09a9d6c
--- /dev/null
+++ b/storage/mroonga/vendor/groonga/plugins/sharding/parameters.rb
@@ -0,0 +1,10 @@
+module Groonga
+ module Sharding
+ module Parameters
+ @range_index = :auto
+ class << self
+ attr_accessor :range_index
+ end
+ end
+ end
+end
diff --git a/storage/mroonga/vendor/groonga/plugins/sharding/range_expression_builder.rb b/storage/mroonga/vendor/groonga/plugins/sharding/range_expression_builder.rb
new file mode 100644
index 00000000..cc80735d
--- /dev/null
+++ b/storage/mroonga/vendor/groonga/plugins/sharding/range_expression_builder.rb
@@ -0,0 +1,88 @@
+module Groonga
+ module Sharding
+ class RangeExpressionBuilder
+ attr_reader :match_columns_expression
+
+ attr_writer :match_columns
+ attr_writer :query
+ attr_writer :filter
+
+ def initialize(key, target_range)
+ @key = key
+ @target_range = target_range
+ @match_columns_expression = nil
+ @match_columns = nil
+ @query = nil
+ @filter = nil
+ end
+
+ def build_all(expression)
+ build_condition(expression)
+ end
+
+ def build_partial_min(expression)
+ expression.append_object(@key, Operator::PUSH, 1)
+ expression.append_operator(Operator::GET_VALUE, 1)
+ expression.append_constant(@target_range.min, Operator::PUSH, 1)
+ if @target_range.min_border == :include
+ expression.append_operator(Operator::GREATER_EQUAL, 2)
+ else
+ expression.append_operator(Operator::GREATER, 2)
+ end
+ build_condition(expression)
+ end
+
+ def build_partial_max(expression)
+ expression.append_object(@key, Operator::PUSH, 1)
+ expression.append_operator(Operator::GET_VALUE, 1)
+ expression.append_constant(@target_range.max, Operator::PUSH, 1)
+ if @target_range.max_border == :include
+ expression.append_operator(Operator::LESS_EQUAL, 2)
+ else
+ expression.append_operator(Operator::LESS, 2)
+ end
+ build_condition(expression)
+ end
+
+ def build_partial_min_and_max(expression)
+ between = Groonga::Context.instance["between"]
+ expression.append_object(between, Operator::PUSH, 1)
+ expression.append_object(@key, Operator::PUSH, 1)
+ expression.append_operator(Operator::GET_VALUE, 1)
+ expression.append_constant(@target_range.min, Operator::PUSH, 1)
+ expression.append_constant(@target_range.min_border,
+ Operator::PUSH, 1)
+ expression.append_constant(@target_range.max, Operator::PUSH, 1)
+ expression.append_constant(@target_range.max_border,
+ Operator::PUSH, 1)
+ expression.append_operator(Operator::CALL, 5)
+ build_condition(expression)
+ end
+
+ private
+ def build_condition(expression)
+ if @query
+ is_empty = expression.empty?
+ if @match_columns
+ table = Context.instance[expression[0].domain]
+ @match_columns_expression = Expression.create(table)
+ @match_columns_expression.parse(@match_columns)
+ end
+ flags = Expression::SYNTAX_QUERY |
+ Expression::ALLOW_PRAGMA |
+ Expression::ALLOW_COLUMN
+ expression.parse(@query,
+ default_column: @match_columns_expression,
+ flags: flags)
+ expression.append_operator(Operator::AND, 2) unless is_empty
+ end
+
+ if @filter
+ is_empty = expression.empty?
+ expression.parse(@filter)
+ expression.append_operator(Operator::AND, 2) unless is_empty
+ end
+ end
+ end
+ end
+end
diff --git a/storage/mroonga/vendor/groonga/plugins/sharding/sources.am b/storage/mroonga/vendor/groonga/plugins/sharding/sources.am
new file mode 100644
index 00000000..df2b6d02
--- /dev/null
+++ b/storage/mroonga/vendor/groonga/plugins/sharding/sources.am
@@ -0,0 +1,10 @@
+sharding_scripts = \
+ logical_count.rb \
+ logical_enumerator.rb \
+ logical_parameters.rb \
+ logical_range_filter.rb \
+ logical_select.rb \
+ logical_shard_list.rb \
+ logical_table_remove.rb \
+ parameters.rb \
+ range_expression_builder.rb