NettyのChannelInboundHandlerとChannelOutboundHandleについてまとめる

はじめに

Netty In Actionを読んでいて、それぞれがのハンドラーがどのどのタイミングで実行されるかが分かりづらかったので、自身の頭の中の整理を目的に自分の理解をまとめて見ようと思います。

Inbound? OutBound?

ChannelPipelineでは、ソケットとアプリケーションを両端として、それぞれの方向に流れるデータフローをInboundとOutBoundと呼んでいます。

  • ソケット⇨アプリケーション : Inbound
  • アプリケーション⇨ソケット: Outbound

また、それぞれのデータフローのイベントに対して、ChannelPipelineには別の種類のハンドラーを登録することが可能で、そのハンドラーを実装するためのインターフェースが、ChannelInboundHandlerChannelOutboundHandlerです。その名の通りそれぞれがソケットに空のInboundとOutboundのイベントで発火されます。
それぞれの方向で基本的に、Hndler間での関係性は無く、ChannelHandlertContextを通してお互いに相互作用はしません。

ChannelInboundHandler

ChannelInboundHandlerのデータに対するイベントで発火されるメソッドが定義されたInterfaceです。具体的には以下のようなものが定義されています。

メソッド名 説明
channelRegistered EventLoopとChannelHandlertContextがChannelに登録され、何かのI/Oのハンドルが可能となった際に実行される
channelUnregistered EventLoopとChannelHandlertContextがChannelから解除され、なにかのI/Oのハンドルが不可能となった際に実行される
channelActive ChannelのChannelHndlerContextがアクティブ、チャンネルが接続もしくは、バインドされ準備ができた状態になった際に実行される
channelInactive ChannelがActive状態ではなくなった際際に実行される。リモートの通信先と接続がキレている
channelReadComplete readオペレーションが終わった際に実行される
channelRead データがChannelから読み出された際に実行される
channelWritabilityChanged ChannelのWritabilityの状態が変わった際に呼び出される。ユーザは書き込みがが終わっていないことを確認できたり、書き込み可能になった際に書き込みを行なうようにすることができる。その確認を行なうために、Channelのメソッドである、isWritable()メソッドを利用することができる。
userEventTriggered ChannelnboundHandler.fireUserEventTriggered()が呼ばれた際に実行される。
exceptionCaught 例外が投げられた際に実行される。

例えば以下のようなハンドラーをChannelPipelineに登録します。

@ChannelHandler.Sharable
public class EchoInboundHandler extends ChannelInboundHandlerAdapter {


    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelRegistered inbound handler called");
        super.channelRegistered(ctx);
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelUnregistered inbound handler called");
        super.channelUnregistered(ctx);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelActive inbound handler called");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelInactive inbound handler called");
        super.channelInactive(ctx);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("channelRead inbound handler called");

        //送信されたデータをコンソールに出力してそのまま返す。
        ByteBuf input = (ByteBuf) msg;
        System.out.println(input.toString(CharsetUtil.UTF_8));
        ctx.write(input);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelReadComplete inbound handler called");

        //読み込みが終了の際にコネクションをクローズする
        ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
                .addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        super.userEventTriggered(ctx, evt);
    }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        super.channelWritabilityChanged(ctx);
    }

}

TCPソケットの通信でaaaというデータを送った際には以下のような出力が標準出力にされます。channelActiveまでが、コネクションを確立されるまでに出力され、

channelRegistered inbound handler called
channelActive inbound handler called
channelRead inbound handler called
aaa

channelReadComplete inbound handler called
channelInactive inbound handler called
channelUnregistered inbound handler called

userEventTriggeredchannelWritabilityChangedなどは、前述の表のようなタイミングで実行されるため、今回のケースでは実行されないようです。

ChannelOutboundHandler

Inboundに対して、ChannelOutboundHandlerに関しては、以下のようなメソッドが定義されています。

メソッド名 説明
bind(ChannelHandlerContext, SocketAddress,ChannelPromise) ローカルアドレスにバインドがリクエストされた際に実行される
connect(ChannelHandlerContext, SocketAddress,SocketAddress,ChannelPromise) リモートの通信先にChannelと接続のリクエストの際に実行される
disconnect(ChannelHandlerContext, ChannelPromise) Channelのリモートの通信先とのコネクションを切断するリクエストを送る際に実行されます
close(ChannelHandlerContext,ChannelPromise) Channelのクローズのリクエストの際に
disconnect(ChannelHandlerContext, ChannelPromise) EventLoopからChannelの登録解除がリクエストされた際に実行されます。
read(ChannelHandlerContext) Channelのリードがリクエストされた際に実行されます
flush(ChannelHandlerContext) Channelのキューに溜まったデータをフラッシュのリクエストの際に実行されます
write(ChannelHandlerContext,Object, ChannelPromise Channelを通して書き込みがリクエストされる際に実行されます

例えば以下の2つのようなハンドラーをChannelPipelineに登録します。

@ChannelHandler.Sharable
public class EchoOutBoundHandler extends ChannelOutboundHandlerAdapter {

    @Override
    public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
        System.out.println("bind outbound handler called");
        super.bind(ctx, localAddress, promise);
    }

    @Override
    public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
        System.out.println("connect outbound handler called");
        super.connect(ctx, remoteAddress, localAddress, promise);
    }

    @Override
    public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        System.out.println("disconnect outbound handler called");
        super.disconnect(ctx, promise);
    }

    @Override
    public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        System.out.println("close outbound handler called");
        super.close(ctx, promise);
    }

    @Override
    public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        System.out.println("deregister outbound handler called");
        super.deregister(ctx, promise);
    }

    @Override
    public void read(ChannelHandlerContext ctx) throws Exception {
        System.out.println("read outbound handler called");
        super.read(ctx);
    }



    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        System.out.println("write outbound handler called");
        super.write(ctx, msg, promise);
    }

    @Override
    public void flush(ChannelHandlerContext ctx) throws Exception {
        System.out.println("flush outbound handler called");
        super.flush(ctx);
    }
}
public class EchoClient extends SimpleChannelInboundHandler<ByteBuf> {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 通信先にhelloのメッセージを送信        
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello", CharsetUtil.UTF_8));
        super.channelActive(ctx);
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
  // 通信先からのレスポンスを表示
        System.out.println(msg.toString(CharsetUtil.UTF_8));
    }
}

その状態でNettyをクライアントとして実行し、通信を行なうとコンソールに以下のような出力が行われます。

connect outbound handler called
write outbound handler called
flush outbound handler called
read outbound handler called
hello
read outbound handler called
read outbound handler called