サブスクライブ直後のメッセージがスキップされるレースコンディションを修正
@last_id の遅延初期化が引き起こすレースコンディションを修正し、サブスクライブ直後にブロードキャストされたメッセージが欠落する問題を解消しました。
背景
Solid Cable のリスナーには、サブスクライブとメッセージポーリングの間にブロードキャストが割り込むと、そのメッセージが永続的に失われるレースコンディションが存在していました。問題の根本は @last_id の遅延初期化にあります。
@last_id はリスナースレッドが初めて broadcast_messages を呼び出したタイミングで MAX(id) として評価されていました。サブスクライブ後、リスナースレッドが最初のポーリングサイクルに到達するまでの間にブロードキャストが発生すると、そのメッセージの id が MAX(id) の評価結果に含まれてしまいます。その結果、後続のクエリ WHERE id > last_id がそのメッセージを対象外として除外し、メッセージが完全に失われます。
PRに示されている再現シナリオは以下のとおりです:
Main thread Listener thread
────────── ───────────────
subscribe()
Listener.new → starts thread → thread starts, enters listen loop
add_channel:
channels["test"] = MAX(id) = 5
event_loop.post(on_success)
interruptible { executor.run! }
on_success fires → subscribed.set
subscribed.wait returns
↑ still hasn't reached broadcast_messages
broadcast("hello") → inserts id=6
broadcast_messages:
@last_id ||= MAX(id) → 6 ← INCLUDES THE MESSAGE
broadcastable(["test"], 6)
→ WHERE id > 6 → nothing returned!
message 6 is SKIPPED
チャネルごとのカーソル(channels["test"] = 5)はこの問題を補えません。このカーソルはクエリ結果のループ内でセカンダリフィルタとして機能するにすぎず、WHERE id > last_id で既に除外されたメッセージには作用しないためです。
技術的な変更
@last_id の初期化をリスナーのコンストラクタに移動することで、ポーリング開始前に確定した値を保持するよう変更されました。
変更前:
attr_writer :last_id
attr_accessor :reconnect_attempt
def last_id
@last_id ||= last_message_id
end
変更後:
def initialize(event_loop)
# ...
@reconnect_attempt = 0
@last_id = last_message_id # ← コンストラクタで即時評価
@thread = Thread.new do
# ...
end
end
# private section
attr_accessor :last_id, :reconnect_attempt
||= による遅延評価を廃止し、initialize 内で last_message_id(= SolidCable::Message.maximum(:id) || 0)を直接代入します。attr_writer :last_id と遅延評価のラッパーメソッド last_id は不要になるため削除され、attr_accessor :last_id に一本化されています。
この変更に対応して、テスト側でも subscribe 直後の sleep が削除されました。この sleep はリスナースレッドが最初のポーリングサイクルを完了するまで待機させるためのものでしたが、コンストラクタで @last_id が確定するようになったことで不要になっています。
subscribed.wait(WAIT_WHEN_EXPECTING_EVENT)
# sleep WAIT_WHEN_EXPECTING_EVENT ← 削除
assert_predicate subscribed, :set?
設計判断
コンストラクタでの DB 読み取りというトレードオフを受け入れ、正確性を優先する設計が選択されました。
PR著者自身が言及しているように、コンストラクタ内に DB 読み取りを置くことはやや異例です。ただし、Listener はサブスクライブ時に遅延初期化されるため起動時のオーバーヘッドにはならず、リスナースレッドおよび #subscribe の呼び出し元はいずれも DB への読み書きをすぐに行う前提であるため、接続は既に確立済みです。
代替案として @last_id の評価タイミングを subscribe 側で制御する方法も考えられますが、それではリスナースレッドの初期化とチャネル登録の間のウィンドウを塞ぐことができず、根本的な解決にはなりません。コンストラクタで評価することで、スレッドが起動する以前に last_id の基準点が確定し、その後のブロードキャストはいかなるタイミングであっても取りこぼされない保証が得られます。
まとめ
遅延評価から即時評価への一行の変更で、サブスクライブとポーリング開始の間に生じるレースコンディションが根本解決されました。テストから sleep が除去されたことは、この修正が競合状態を正しく排除できていることの直接的な証左です。