Rubyで最小ヒープ
2022-12-25追記
この記事で自前実装したものとは別にAlgorithmsと言うgemを見つけた。
使いたいデータ構造があったらこれを使うのがよさげ。
2023-01-05追記
Algorithms gemのMinHeapはなんだか重い。
中身をちょっと見たらノードをNodeクラスで実装してるのでけっこうオーバーヘッドがある模様。
なのでパフォーマンスが必要なところでは使えない。
優先度付きキューとして使うための最小ヒープ。
常に最小の値が先頭に来るため、最小の値を素早く取り出すことができるデータ構造。計算量はO(1)と極めて軽い。
pushで追加して、popで取り出し。
pushした場合もpopした場合も都度ヒープを再構築する必要があるが、その計算量はO(log n)と軽い。
単純に配列から最小を求める場合はソートして一番先頭の値を取ることになるが、高速なマージソートやクイックソートでもO(n log n)かかるので、再構築の時間を含めても最小ヒープの方がかなり有利。
使い方
入れるのが数値(優先度)だけの場合。
heap = Heap.new
heap.push 5
heap.push 2
heap.push 4
heap.push 1
heap.pop # => 1
heap.pop # => 2
heap.pop # => 4
heap.pop # => 5
heap.pop # => nil
数値(優先度)と何らかのデータ入れる場合。要素の配列は最初の要素が値で2番目以降は任意のデータ。
heap = Heap.new
heap.push [5, "A"]
heap.push [2, "B"]
heap.push [4, "C"]
heap.push [1, "D"]
heap.pop # => [1, "D"]
heap.pop # => [2, "B"]
heap.pop # => [4, "C"]
heap.pop # => [5, "A"]
heap.pop # => nil
コード
# 配列を使った最小ヒープを実装
# ヒープの各要素には値と付随する情報を保持することが可能
class Heap
def initialize
@array = []
# 配列の要素は配列
# 要素の最初の要素が値
# @array => [[1], [2], [3]]
# @array => [[1, 'key1'], [2, 'key2'], [3, 'key3']]
end
def size
@array.size
end
def min
top
end
def top
min = @array.first
min.size == 1 ? min[0] : min
end
def push(item)
item.instance_of?(Array) ? @array.push(item) : @array.push([item])
current_index = @array.size - 1
# 末尾から木の再構築
rebuild_bottom_to_top(current_index)
end
def pop
# 配列の先頭がヒープのルートノード(最小値)なので、それを返すために保持
min = @array.shift
return nil if min.nil? # 要素がない場合はnilを返す
# ヒープを再構築
rebuild_top_to_bottom
# 再構築が終わってから返す
min.size == 1 ? min[0] : min
end
private
# ヒープからpopしたときは末尾の値を先頭に移動して、そこから再構築
def rebuild_top_to_bottom
# 要素がない場合は再構築の必要なし
return if @array.empty?
# 先頭に末尾の値を持ってくる
@array.unshift(@array.pop)
current_index = 0
Kernel.loop do
left_child_index, right_child_index = child_indexes_for(current_index)
# 左側の子がなければそれ以上子はいないので再構築終了
break if @array[left_child_index].nil?
# 左右の子のより小さい方を入れ替え候補に
smaller_child_index =
pick_smaller_child_index(left_child_index, right_child_index)
# 子の方が小さければ入れ替え 入れ替えがなければ終了
break if @array[smaller_child_index][0] >= @array[current_index][0]
switch_item(current_index, smaller_child_index)
current_index = smaller_child_index # インデックスの位置も更新
end
end
def pick_smaller_child_index(left_child_index, right_child_index)
if !@array[right_child_index].nil? &&
@array[right_child_index][0] < @array[left_child_index][0]
right_child_index
else
left_child_index
end
end
# ヒープにpushしたときは追加された値を末尾に置いて、そこから再構築
def rebuild_bottom_to_top(current_index)
Kernel.loop do
break if current_index.zero?
# 親と値を比較して、親の方が大きければ親と位置を入れ替える
parent_index = parent_index_for(current_index)
# p "current_index: #{current_index}, #{@array[current_index][0]}"
# p "parent_index: #{parent_index}, #{@array[parent_index][0]}"
# 入れ替えがなければ再構築終了
break if @array[current_index][0] >= @array[parent_index][0]
switch_item(current_index, parent_index)
current_index = parent_index # インデックスの位置も更新
end
end
def parent_index_for(current_index)
(current_index - 1) / 2
# 0 => -1
# 1 -> 0 => 3, 4
# 2 -> 0 => 5, 6
# 3 -> 1 => 7, 8
# 4 -> 1 => 9, 10
# 5 -> 2
# 6 -> 2
# 7 -> 3
# 8 -> 3
end
def child_indexes_for(current_index)
left_index = current_index * 2 + 1
right_index = current_index * 2 + 2
[left_index, right_index]
end
def switch_item(index1, index2)
@array[index1], @array[index2] = @array[index2], @array[index1]
end
end
テストコード
require "./heap"
require "minitest/autorun"
class TestHeap < MiniTest::Unit::TestCase
def setup
@heap = Heap.new
end
def teardown
@heap = nil
end
def test_push_value
@heap.push(5)
assert_equal @heap.pop, 5
end
def test_pop_min_value
@heap.push(5)
@heap.push(1)
assert_equal @heap.pop, 1
end
def test_push_value_after_pop
@heap.push(5)
@heap.pop
@heap.push(1)
assert_equal @heap.pop, 1
end
def test_pop_always_min_value
@heap.push(5)
@heap.push(1)
@heap.push(3)
@heap.push(2)
@heap.push(7)
@heap.push(6)
@heap.push(4)
pop_items = []
while item = @heap.pop
pop_items.push(item)
end
assert_equal pop_items, [1, 2, 3, 4, 5, 6, 7]
end
def test_pop_nil_from_empty_heap
assert_nil @heap.pop
end
def test_push_item
@heap.push([5, "A"])
assert_equal @heap.pop, [5, "A"]
end
def test_pop_always_min_item
@heap.push([5, "A"])
@heap.push([1, "B"])
@heap.push([3, "C"])
@heap.push([2, "D"])
pop_items = []
while item = @heap.pop
pop_items.push(item)
end
assert_equal pop_items, [[1, "B"], [2, "D"], [3, "C"], [5, "A"]]
end
def test_heap_contains_same_number
@heap.push([5, "A"])
@heap.push([1, "B"])
@heap.push([3, "C"])
@heap.push([2, "D"])
@heap.push([5, "E"])
@heap.push([1, "F"])
pop_items = []
while item = @heap.pop
pop_items.push(item)
end
assert_equal pop_items,
[[1, "B"], [1, "F"], [2, "D"], [3, "C"], [5, "A"], [5, "E"]]
end
end