Rubyで最小ヒープ

2022-12-25追記

この記事で自前実装したものとは別にAlgorithmsと言うgemを見つけた。
使いたいデータ構造があったらこれを使うのがよさげ。

2023-01-05追記

Algorithms gemのMinHeapはなんだか重い。
中身をちょっと見たらノードをNodeクラスで実装してるのでけっこうオーバーヘッドがある模様。
なのでパフォーマンスが必要なところでは使えない。


優先度付きキューとして使うための最小ヒープ。

常に最小の値が先頭に来るため、最小の値を素早く取り出すことができるデータ構造。計算量はO(1)と極めて軽い。

pushで追加して、popで取り出し。
pushした場合もpopした場合も都度ヒープを再構築する必要があるが、その計算量はO(log n)と軽い。

単純に配列から最小を求める場合はソートして一番先頭の値を取ることになるが、高速なマージソートやクイックソートでもO(n log n)かかるので、再構築の時間を含めても最小ヒープの方がかなり有利。

使い方

入れるのが数値(優先度)だけの場合。

heap = Heap.new
heap.push 5
heap.push 2
heap.push 4
heap.push 1
heap.pop # => 1
heap.pop # => 2
heap.pop # => 4
heap.pop # => 5
heap.pop # => nil

数値(優先度)と何らかのデータ入れる場合。要素の配列は最初の要素が値で2番目以降は任意のデータ。

heap = Heap.new
heap.push [5, "A"]
heap.push [2, "B"]
heap.push [4, "C"]
heap.push [1, "D"]
heap.pop # => [1, "D"]
heap.pop # => [2, "B"]
heap.pop # => [4, "C"]
heap.pop # => [5, "A"]
heap.pop # => nil

コード

# 配列を使った最小ヒープを実装
# ヒープの各要素には値と付随する情報を保持することが可能
class Heap
  def initialize
    @array = []
    # 配列の要素は配列
    # 要素の最初の要素が値
    # @array => [[1], [2], [3]]
    # @array => [[1, 'key1'], [2, 'key2'], [3, 'key3']]
  end

  def size
    @array.size
  end

  def min
    top
  end

  def top
    min = @array.first
    min.size == 1 ? min[0] : min
  end

  def push(item)
    item.instance_of?(Array) ? @array.push(item) : @array.push([item])

    current_index = @array.size - 1

    # 末尾から木の再構築
    rebuild_bottom_to_top(current_index)
  end

  def pop
    # 配列の先頭がヒープのルートノード(最小値)なので、それを返すために保持
    min = @array.shift

    return nil if min.nil? # 要素がない場合はnilを返す

    # ヒープを再構築
    rebuild_top_to_bottom

    # 再構築が終わってから返す
    min.size == 1 ? min[0] : min
  end

  private

  # ヒープからpopしたときは末尾の値を先頭に移動して、そこから再構築
  def rebuild_top_to_bottom
    # 要素がない場合は再構築の必要なし
    return if @array.empty?

    # 先頭に末尾の値を持ってくる
    @array.unshift(@array.pop)

    current_index = 0

    Kernel.loop do
      left_child_index, right_child_index = child_indexes_for(current_index)

      # 左側の子がなければそれ以上子はいないので再構築終了
      break if @array[left_child_index].nil?

      # 左右の子のより小さい方を入れ替え候補に
      smaller_child_index =
        pick_smaller_child_index(left_child_index, right_child_index)

      # 子の方が小さければ入れ替え 入れ替えがなければ終了
      break if @array[smaller_child_index][0] >= @array[current_index][0]

      switch_item(current_index, smaller_child_index)
      current_index = smaller_child_index # インデックスの位置も更新
    end
  end

  def pick_smaller_child_index(left_child_index, right_child_index)
    if !@array[right_child_index].nil? &&
         @array[right_child_index][0] < @array[left_child_index][0]
      right_child_index
    else
      left_child_index
    end
  end

  # ヒープにpushしたときは追加された値を末尾に置いて、そこから再構築
  def rebuild_bottom_to_top(current_index)
    Kernel.loop do
      break if current_index.zero?

      # 親と値を比較して、親の方が大きければ親と位置を入れ替える
      parent_index = parent_index_for(current_index)
      # p "current_index: #{current_index}, #{@array[current_index][0]}"
      # p "parent_index: #{parent_index}, #{@array[parent_index][0]}"

      # 入れ替えがなければ再構築終了
      break if @array[current_index][0] >= @array[parent_index][0]

      switch_item(current_index, parent_index)
      current_index = parent_index # インデックスの位置も更新
    end
  end

  def parent_index_for(current_index)
    (current_index - 1) / 2
    # 0 => -1
    # 1 -> 0 => 3, 4
    # 2 -> 0 => 5, 6
    # 3 -> 1 => 7, 8
    # 4 -> 1 => 9, 10
    # 5 -> 2
    # 6 -> 2
    # 7 -> 3
    # 8 -> 3
  end

  def child_indexes_for(current_index)
    left_index = current_index * 2 + 1
    right_index = current_index * 2 + 2

    [left_index, right_index]
  end

  def switch_item(index1, index2)
    @array[index1], @array[index2] = @array[index2], @array[index1]
  end
end

テストコード

require "./heap"
require "minitest/autorun"

class TestHeap < MiniTest::Unit::TestCase
  def setup
    @heap = Heap.new
  end

  def teardown
    @heap = nil
  end

  def test_push_value
    @heap.push(5)
    assert_equal @heap.pop, 5
  end

  def test_pop_min_value
    @heap.push(5)
    @heap.push(1)
    assert_equal @heap.pop, 1
  end

  def test_push_value_after_pop
    @heap.push(5)
    @heap.pop
    @heap.push(1)
    assert_equal @heap.pop, 1
  end

  def test_pop_always_min_value
    @heap.push(5)
    @heap.push(1)
    @heap.push(3)
    @heap.push(2)
    @heap.push(7)
    @heap.push(6)
    @heap.push(4)
    pop_items = []
    while item = @heap.pop
      pop_items.push(item)
    end
    assert_equal pop_items, [1, 2, 3, 4, 5, 6, 7]
  end

  def test_pop_nil_from_empty_heap
    assert_nil @heap.pop
  end

  def test_push_item
    @heap.push([5, "A"])
    assert_equal @heap.pop, [5, "A"]
  end

  def test_pop_always_min_item
    @heap.push([5, "A"])
    @heap.push([1, "B"])
    @heap.push([3, "C"])
    @heap.push([2, "D"])
    pop_items = []
    while item = @heap.pop
      pop_items.push(item)
    end
    assert_equal pop_items, [[1, "B"], [2, "D"], [3, "C"], [5, "A"]]
  end

  def test_heap_contains_same_number
    @heap.push([5, "A"])
    @heap.push([1, "B"])
    @heap.push([3, "C"])
    @heap.push([2, "D"])
    @heap.push([5, "E"])
    @heap.push([1, "F"])
    pop_items = []
    while item = @heap.pop
      pop_items.push(item)
    end
    assert_equal pop_items,
                 [[1, "B"], [1, "F"], [2, "D"], [3, "C"], [5, "A"], [5, "E"]]
  end
end