並行実行制御における競合状態の修正
Solid Queueの並行実行制御機構で、ジョブ完了時のブロック解除処理が新たに作成中のBlockedExecutionを見逃す競合状態が修正されました。この問題により、ブロックされたジョブが定期メンテナンスタスクの実行(最大で数分後)まで滞留する事象が発生していました。
背景
#456 で報告された問題は、並行実行数の上限に達していないにもかかわらず、ジョブが不必要にブロックされる現象でした。あるジョブが完了してから30分以上経過した後に、次のジョブがようやく実行される事例が観測されています。limits_concurrency to: 1 の設定で、先行ジョブが100ミリ秒で完了しているにもかかわらず、後続ジョブが30分後まで実行されない状況が発生しました。
この問題の根本原因は、ジョブのエンキュー処理とブロック解除処理の間で発生する競合状態にありました。Semaphore::Proxy#wait がセマフォの値を確認する際に行ロックを取得しないため、並行して実行される signal 処理との間にタイミングの問題が生じていました。
競合状態の発生メカニズム
問題は以下の手順で発生します:
- ジョブAが実行中(セマフォ値=0)
- ジョブBのエンキュー開始:セマフォを読み取り(値=0、行ロックなし)→ブロックを決定
- ジョブAが完了:
Semaphore.signal→ 値を1にUPDATE(ロックがないため即座に成功) - ジョブA:
BlockedExecution.release_one→SELECTで何も見つからない(ジョブBのBlockedExecutionはまだコミットされていない) - ジョブBのエンキューがコミット:
BlockedExecutionが作成されるが、もう誰もブロック解除しない
この競合ウィンドウは、エンキュー処理がセマフォの確認からトランザクションのコミットまでの間に、並行する signal が完了してしまうことで発生します。release_one が実行される時点では、BlockedExecution のレコードがまだトランザクション内にあるため、可視性がありません。
技術的な変更
app/models/solid_queue/semaphore.rb の Semaphore::Proxy#wait メソッドが修正され、セマフォの確認時に FOR UPDATE による行ロックを取得するようになりました。
変更前:
def wait
if semaphore = Semaphore.find_by(key: key)
semaphore.value > 0 && attempt_decrement
else
attempt_creation
end
end
変更後:
def wait
if semaphore = Semaphore.lock.find_by(key: key)
semaphore.value > 0 && attempt_decrement
else
attempt_creation
end
end
.lock の追加により、エンキュー処理がセマフォの確認から BlockedExecution の作成とコミットまでの間、セマフォ行のロックを保持します。これにより、並行する signal の UPDATE は待機を強制され、release_one が実行される時点で BlockedExecution の可視性が保証されます。
新たに追加された test/models/solid_queue/semaphore_test.rb には、この修正を検証するテストが含まれています。wait acquires a row lock that blocks concurrent signal テストでは、FOR UPDATE ロックが保持されている間、並行する UPDATE が実際にブロックされることを確認しています。別スレッドでセマフォ行の FOR UPDATE ロックを保持し、メインスレッドでの UPDATE が最低0.5秒間ブロックされることを検証します。
デッドロックの考察
PR本文では、この変更がデッドロックを引き起こさないことが説明されています。2つの処理パスにおけるロック取得順序を分析すると、循環依存が発生しないことが確認できます:
-
エンキューパス:
Semaphore行をロック →BlockedExecutionをINSERT(既存行のロックなし) -
release_oneパス:BlockedExecution行をロック(SKIP LOCKED)→Semaphore行をロック(release内のwaitを経由)
BlockedExecution のロック取得に SKIP LOCKED が使用されているため、複数のワーカーが同じブロック済みジョブを取得しようとして競合することはありません。また、エンキューパスは既存の BlockedExecution 行をロックしないため、release_one パスとの間で循環依存は発生しません。
まとめ
本PRは、セマフォの確認時に FOR UPDATE を追加することで、並行実行制御の競合状態を解決しました。この1行の変更により、エンキュー処理がトランザクション全体でセマフォ行のロックを保持し、signal 処理との原子性が保証されます。ロック取得順序の設計により、新たなデッドロックのリスクを導入することなく、ブロックされたジョブが確実に解除される仕組みが実現されています。