Action Cable のサーバー層アダプタ化:低レベル処理とアプリケーション層の分離
Action Cable の内部アーキテクチャが大幅に再編され、WebSocket処理や並行制御などの低レベル実装と、ChannelやConnectionといったアプリケーション層の抽象が明確に分離されました。これにより、ユーザーコードを変更せずに代替サーバー実装や実行モデルを利用できる拡張性の基盤が整います。
背景
これまでの Action Cable では、ActionCable::Connection::Base が WebSocket イベントリスナーの設定、Rails Executor のラッピング、ユーザーコードの実行という異なる責務を一手に担っていました。この密結合により、非WebSocket環境やユニットテストでの再利用が困難であり、代替サーバー(Iodine、SSEなど)との統合には内部へのモンキーパッチが必要でした。同様の問題意識は rails/rails#27648 でも取り上げられており、本PRはその目標を改めて実現するものです。
結果として、ActionCableをライブラリとして他の実行モデル(Fiberベースの並行処理など)から活用することが現実的ではありませんでした。
技術的な変更
Connection::Base と Server::Socket の分割
最も大きな変更は、ActionCable::Connection::Base からWebSocket固有の処理を ActionCable::Server::Socket として切り出したことです。Server::Socket がWebSocket接続の低レベル処理(エンコード/デコード、メッセージバッファリング、プロトコルハンドシェイク)を担い、Connection::Base はアプリケーション層の純粋な抽象として残ります。
変更前:
def initialize(server, env, coder: ActiveSupport::JSON)
@server, @env, @coder = server, env, coder
@worker_pool = server.worker_pool
@logger = new_tagged_logger
@websocket = ActionCable::Connection::WebSocket.new(env, self, event_loop)
@subscriptions = ActionCable::Connection::Subscriptions.new(self)
@message_buffer = ActionCable::Connection::MessageBuffer.new(self)
...
end
変更後:
def initialize(server, socket)
@server = server
@socket = socket
@logger = socket.logger
@subscriptions = Subscriptions.new(self)
...
end
Connection::Base は socket オブジェクトへの委譲(env、request、protocol、perform_work)と server への委譲(pubsub、executor、config)を通じて動作します。サーバー側では call メソッドが直接 Socket.new を呼び出すよう変更されています。
def call(env)
return config.health_check_application.call(env) if env["PATH_INFO"] == config.health_check_path
setup_heartbeat_timer
Socket.new(self, env).process # 変更前: config.connection_class.call.new(self, env).process
end
ThreadedExecutor の独立
ActionCable::Server::ThreadedExecutor が新設され、これまで StreamEventLoop 内に隠蔽されていた Concurrent::ThreadPoolExecutor が独立したサーバー属性として昇格しました。インターフェースは #post、#timer、#shutdown の3メソッドのみに絞られています。
class ThreadedExecutor
def initialize(max_size: 10, name: "server")
@executor = Concurrent::ThreadPoolExecutor.new(
name: "ActionCable-#{name}",
min_threads: 1,
max_threads: max_size,
max_queue: 0,
)
end
def post(task = nil, &block) = @executor << (task || block)
def timer(interval, &block) = Concurrent::TimerTask.new(execution_interval: interval, &block).tap(&:execute)
def shutdown = @executor.shutdown
end
これに伴い Configuration に executor_pool_size(デフォルト10)が追加されました。また StreamEventLoop から timer/post メソッドが削除され、IO監視専用の役割に整理されています。
SubscriberMap::Async の標準化
非同期pub/sub処理 がアダプター層で統一されました。以前は各アダプター(Async、PostgreSQL、Redis)がそれぞれ独自に event_loop.post を呼び出していましたが、SubscriberMap::Async サブクラスに集約されました。
class Async < self
def initialize(executor)
@executor = executor
super()
end
def add_subscriber(*) = @executor.post { super }
def remove_subscriber(*) = @executor.post { super }
def invoke_callback(*) = @executor.post { super }
end
これにより stream_from での二重非同期呼び出し(Channelレイヤーの event_loop.post + アダプター内の非同期処理)が解消されました。
ファイル移動と削除
低レベル実装クラスが ActionCable::Connection 名前空間から ActionCable::Server::Socket 配下に移動されました。
| 変更前 | 変更後 |
|---|---|
Connection::ClientSocket |
Server::Socket::ClientSocket |
Connection::WebSocket |
Server::Socket::WebSocket |
Connection::Stream |
Server::Socket::Stream |
Connection::MessageBuffer |
Server::Socket::MessageBuffer |
Connection::StreamEventLoop |
Server::StreamEventLoop |
Connection::TaggedLoggerProxy |
Server::TaggedLoggerProxy |
Subscriptions のエラーハンドリング改善
Subscriptions クラスに専用の例外クラス群が追加されました。以前はエラーログの書き出しにとどまっていた異常系が、呼び出し元でハンドリングできる例外として整理されています。
追加された例外クラスは以下のとおりです:
-
AlreadySubscribedError— 同一チャンネルへの重複サブスクライブ -
ChannelNotFound— 存在しないチャンネルID -
MalformedCommandError— 不正なコマンドデータ -
UnknownCommandError— 未知のコマンド -
UnknownSubscription— 該当するサブスクリプションが存在しない
テストインフラの改善
テスト用の ConnectionStub/ChannelStub が廃止され、実際のクラスを使用するアーキテクチャに移行されました。TestSocket クラスが新設され、テスト環境でも本番と同じ Connection::Base と Channel クラスがそのまま動作します。advance_time ヘルパーも追加され、定期タイマーのテストが可能になっています。また、各サブスクリプションアダプターが @server.mutex ではなく独自の @mutex を持つように変更されており、サーバーオブジェクトへの依存が軽減されています。
設計判断
後方互換性を保ちながら内部アーキテクチャを刷新する という方針が一貫して採られています。PRは「本PRはパブリックAPIの破壊的変更を含まない」と明示しており、ユーザー向けの ApplicationCable::Connection や ApplicationCable::Channel の記述方法は変わりません。
_Socket・_Executor・_PubSub・_Connection といったインターフェース(PRのクラス図参照)を定義することで、将来の代替実装が準拠すべき契約を明確にしています。ThreadedExecutor の最小インターフェース(#post/#timer/#shutdown)もこの思想を体現しており、Fiberベースのエグゼキュータへの差し替えなどを将来のPRで導入しやすくしています。ただし、config.action_cable.async_executor のような設定キーは本PRには含まれておらず、フォローアップに委ねられています。
各アダプターが @server.mutex を共用していた点の改善も注目に値します。Inline・PostgreSQL・Redis の各アダプターがそれぞれ独自の @mutex を持つようになったことで、サーバーオブジェクトへの不必要な依存が排除されています。これはアダプター単体での利用可能性を高める変更です。
まとめ
本PRは、Action Cable の「WebSocket処理とアプリケーションロジックの分離」という長年の課題(rails/rails#27648 からの継続)に対する実質的な回答です。Server::Socket・ThreadedExecutor・SubscriberMap::Async という3つの明確な抽象化を導入することで、既存ユーザーコードへの影響を最小化しつつ、Iodineのネイティブpub/sub活用やSSEトランスポートの実装といった代替実装の土台が整いました。