MirrorService#mirror の exist? チェックとアップロードを並列化
MirrorService#mirror における exist? チェックとアップロードが、既存のスレッドプールを活用して並列実行されるようになりました。これにより、N台のミラーに対するwall timeがO(N)からO(1)のネットワークラウンドトリップに短縮されます。
背景
MirrorService#mirror は、ファイルをプライマリサービスから各ミラーへコピーする処理を順次(O(N))実行していました。一方、delete や delete_prefixed の処理では @executor スレッドプールがすでに活用されていましたが、mirror メソッドはこのプールを一切使用していませんでした。
PRの説明に示されたベンチマーク(クラウドレイテンシ50ms/callを模擬、5回の中央値)によると、逐次実行と並列実行の差は以下のとおりです:
| ミラー数 | 逐次実行 | 並列実行 | 高速化 |
|---|---|---|---|
| 2 | 100ms | 50ms | 約2倍 |
| 3 | 150ms | 50ms | 約3倍 |
| 5 | 250ms | 50ms | 約5倍 |
ボトルネックはネットワークのラウンドトリップであり、ファイルサイズ(50KB〜20MB)は影響しないことも確認されています。
技術的な変更
mirror メソッドが Concurrent::Promise を用いた2段階の並列処理に刷新され、exist? チェックとアップロードの両フェーズが並列実行されます。
変更前:
def mirror(key, checksum:)
instrument :mirror, key: key, checksum: checksum do
if (mirrors_in_need_of_mirroring = mirrors.select { |service| !service.exist?(key) }).any?
primary.open(key, checksum: checksum, verify: checksum.present?) do |io|
mirrors_in_need_of_mirroring.each do |service|
io.rewind
service.upload key, io, checksum: checksum
end
end
end
end
end
変更後:
def mirror(key, checksum:)
instrument :mirror, key: key, checksum: checksum do
mirrors_in_need_of_mirroring = mirrors_needing_mirroring(key)
if mirrors_in_need_of_mirroring.any?
primary.open(key, checksum: checksum, verify: checksum.present?) do |io|
io.rewind
content = io.read.freeze
tasks = mirrors_in_need_of_mirroring.map do |service|
Concurrent::Promise.execute(executor: @executor) do
service.upload key, StringIO.new(content), checksum: checksum
end
end
tasks.each(&:value!)
end
end
end
end
private
def mirrors_needing_mirroring(key)
tasks = mirrors.map do |service|
[ service, Concurrent::Promise.execute(executor: @executor) { service.exist?(key) } ]
end
tasks.reject { |_, promise| promise.value! }.map(&:first)
end
スレッドセーフ性の確保には2つの工夫が施されています。第一に、ファイルの内容を io.read.freeze で一度だけ読み込んで凍結した文字列として保持し、各ミラーへのアップロード時には StringIO.new(content) で個別のIOオブジェクトを生成します。これにより、複数スレッドが同一のIOポインタを競合して操作する問題を回避しています。第二に、primary.open のブロック冒頭で io.rewind を呼び出すことで、Tempfileが EOF 状態で yield された場合でも正しくファイル先頭から読み込めるよう保護しています。
mirrors_needing_mirroring ヘルパーメソッドの抽出により、exist? チェックフェーズも @executor プールで並列化されています。promise.value! は処理の完了を待機するとともに、いずれかのPromiseが例外で終了した場合にその例外を呼び出し元へ伝播させます。
設計判断
ファイル内容を一度だけ読んで凍結文字列として共有する方式が採用されました。
並列アップロードの実装として、各スレッドが独自にプライマリストレージを読み直す方式も考えられますが、それではネットワークI/Oが増加します。本PRでは内容を1回だけ読み込んで freeze した文字列を共有し、各スレッドへは StringIO.new(content) で独立したIOオブジェクトを渡すことで、読み込みコストを抑えつつスレッドセーフを実現しています。
exist? チェックとアップロードの2フェーズを明確に分離したことも、設計上の判断として読み取れます。exist? チェックの結果をすべて待機してからアップロードを開始することで、不要なアップロードを確実に防ぎつつ、各フェーズ内では並列実行を最大化しています。また、@executor はすでに mirrors.size にサイズされているため、スレッドプールの追加設定なしに適切な並列度が確保されます。
まとめ
本PRは、MirrorService に既存のスレッドプールが未活用だった mirror メソッドへの並列処理を導入した変更です。スレッドセーフのための凍結文字列と独立したStringIOという設計により、ミラー数に比例して増加していたレイテンシをO(1)に抑え、クラウドストレージのマルチミラー構成における実用的な高速化を実現しています。