NettyのChannelInboundHandlerとChannelOutboundHandleについてまとめる
はじめに
Netty In Actionを読んでいて、それぞれがのハンドラーがどのどのタイミングで実行されるかが分かりづらかったので、自身の頭の中の整理を目的に自分の理解をまとめて見ようと思います。
Inbound? OutBound?
ChannelPipelineでは、ソケットとアプリケーションを両端として、それぞれの方向に流れるデータフローをInboundとOutBoundと呼んでいます。
- ソケット⇨アプリケーション : Inbound
- アプリケーション⇨ソケット: Outbound
また、それぞれのデータフローのイベントに対して、ChannelPipelineには別の種類のハンドラーを登録することが可能で、そのハンドラーを実装するためのインターフェースが、ChannelInboundHandlerとChannelOutboundHandlerです。その名の通りそれぞれがソケットに空のInboundとOutboundのイベントで発火されます。
それぞれの方向で基本的に、Hndler間での関係性は無く、ChannelHandlertContextを通してお互いに相互作用はしません。
ChannelInboundHandler
ChannelInboundHandlerのデータに対するイベントで発火されるメソッドが定義されたInterfaceです。具体的には以下のようなものが定義されています。
| メソッド名 | 説明 |
|---|---|
| channelRegistered | EventLoopとChannelHandlertContextがChannelに登録され、何かのI/Oのハンドルが可能となった際に実行される |
| channelUnregistered | EventLoopとChannelHandlertContextがChannelから解除され、なにかのI/Oのハンドルが不可能となった際に実行される |
| channelActive | ChannelのChannelHndlerContextがアクティブ、チャンネルが接続もしくは、バインドされ準備ができた状態になった際に実行される |
| channelInactive | ChannelがActive状態ではなくなった際際に実行される。リモートの通信先と接続がキレている |
| channelReadComplete | readオペレーションが終わった際に実行される |
| channelRead | データがChannelから読み出された際に実行される |
| channelWritabilityChanged | ChannelのWritabilityの状態が変わった際に呼び出される。ユーザは書き込みがが終わっていないことを確認できたり、書き込み可能になった際に書き込みを行なうようにすることができる。その確認を行なうために、Channelのメソッドである、isWritable()メソッドを利用することができる。 |
| userEventTriggered | ChannelnboundHandler.fireUserEventTriggered()が呼ばれた際に実行される。 |
| exceptionCaught | 例外が投げられた際に実行される。 |
例えば以下のようなハンドラーをChannelPipelineに登録します。
@ChannelHandler.Sharable public class EchoInboundHandler extends ChannelInboundHandlerAdapter { @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { System.out.println("channelRegistered inbound handler called"); super.channelRegistered(ctx); } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { System.out.println("channelUnregistered inbound handler called"); super.channelUnregistered(ctx); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("channelActive inbound handler called"); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("channelInactive inbound handler called"); super.channelInactive(ctx); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("channelRead inbound handler called"); //送信されたデータをコンソールに出力してそのまま返す。 ByteBuf input = (ByteBuf) msg; System.out.println(input.toString(CharsetUtil.UTF_8)); ctx.write(input); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { System.out.println("channelReadComplete inbound handler called"); //読み込みが終了の際にコネクションをクローズする ctx.writeAndFlush(Unpooled.EMPTY_BUFFER) .addListener(ChannelFutureListener.CLOSE); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { super.userEventTriggered(ctx, evt); } @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { super.channelWritabilityChanged(ctx); } }
TCPソケットの通信でaaaというデータを送った際には以下のような出力が標準出力にされます。channelActiveまでが、コネクションを確立されるまでに出力され、
channelRegistered inbound handler called channelActive inbound handler called channelRead inbound handler called aaa channelReadComplete inbound handler called channelInactive inbound handler called channelUnregistered inbound handler called
userEventTriggeredやchannelWritabilityChangedなどは、前述の表のようなタイミングで実行されるため、今回のケースでは実行されないようです。
ChannelOutboundHandler
Inboundに対して、ChannelOutboundHandlerに関しては、以下のようなメソッドが定義されています。
| メソッド名 | 説明 |
|---|---|
| bind(ChannelHandlerContext, SocketAddress,ChannelPromise) | ローカルアドレスにバインドがリクエストされた際に実行される |
| connect(ChannelHandlerContext, SocketAddress,SocketAddress,ChannelPromise) | リモートの通信先にChannelと接続のリクエストの際に実行される |
| disconnect(ChannelHandlerContext, ChannelPromise) | Channelのリモートの通信先とのコネクションを切断するリクエストを送る際に実行されます |
| close(ChannelHandlerContext,ChannelPromise) | Channelのクローズのリクエストの際に |
| disconnect(ChannelHandlerContext, ChannelPromise) | EventLoopからChannelの登録解除がリクエストされた際に実行されます。 |
| read(ChannelHandlerContext) | Channelのリードがリクエストされた際に実行されます |
| flush(ChannelHandlerContext) | Channelのキューに溜まったデータをフラッシュのリクエストの際に実行されます |
| write(ChannelHandlerContext,Object, ChannelPromise | Channelを通して書き込みがリクエストされる際に実行されます |
例えば以下の2つのようなハンドラーをChannelPipelineに登録します。
@ChannelHandler.Sharable public class EchoOutBoundHandler extends ChannelOutboundHandlerAdapter { @Override public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { System.out.println("bind outbound handler called"); super.bind(ctx, localAddress, promise); } @Override public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { System.out.println("connect outbound handler called"); super.connect(ctx, remoteAddress, localAddress, promise); } @Override public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { System.out.println("disconnect outbound handler called"); super.disconnect(ctx, promise); } @Override public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { System.out.println("close outbound handler called"); super.close(ctx, promise); } @Override public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { System.out.println("deregister outbound handler called"); super.deregister(ctx, promise); } @Override public void read(ChannelHandlerContext ctx) throws Exception { System.out.println("read outbound handler called"); super.read(ctx); } @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println("write outbound handler called"); super.write(ctx, msg, promise); } @Override public void flush(ChannelHandlerContext ctx) throws Exception { System.out.println("flush outbound handler called"); super.flush(ctx); } }
public class EchoClient extends SimpleChannelInboundHandler<ByteBuf> { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 通信先にhelloのメッセージを送信 ctx.writeAndFlush(Unpooled.copiedBuffer("hello", CharsetUtil.UTF_8)); super.channelActive(ctx); } @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { // 通信先からのレスポンスを表示 System.out.println(msg.toString(CharsetUtil.UTF_8)); } }
その状態でNettyをクライアントとして実行し、通信を行なうとコンソールに以下のような出力が行われます。
connect outbound handler called write outbound handler called flush outbound handler called read outbound handler called hello read outbound handler called read outbound handler called