반응형

myhappyman.tistory.com/191

 

Netty Client 튜토리얼 - 03 (서버에 연결이 안 될 경우 재시도하기)

myhappyman.tistory.com/189 Netty Client 튜토리얼 - 02 (메시지별 보내고 받고 끊기) myhappyman.tistory.com/187 Netty - Netty Client 튜토리얼 - 01 TCP 통신 해야하는 경우 JAVA SOCKET을 통해 서버를 구성..

myhappyman.tistory.com

 

NettyClient 튜토리얼 마지막 4장을 작성해볼까 합니다.

 

상대편(서버)에서 응답이 n초가 없는 경우 처리하기

bootstrap handler를 등록할 때 IdleStateHandler 라는 메소드를 등록하여 일정 시간동안 수신을 못받거나 응답을 못해주거나 할 때, Idle상태를 감지해주는 핸들러입니다.

 

idle상태 체크하기

bs.handler(new ChannelInitializer<SocketChannel>() {
	@Override
	protected void initChannel(SocketChannel ch) throws Exception {
    	//idle 등록
		ch.pipeline().addLast("idleStateHandler", new IdleStateHandler(5, 0, 0));
        
        
		NettyClientHandler handler = new NettyClientHandler(msgArr[idx]);		
		ch.pipeline().addLast("clientHandler", handler);
	}
});
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
	if(evt instanceof IdleStateEvent) {
		IdleStateEvent e = (IdleStateEvent) evt;
		if(e.state() == IdleState.READER_IDLE) {
        	//...처리할 동작
		}
	}
}

해당 핸들러에 등록된 상태가 만족되면 기존 등록하는 핸들러 중 userEventTriggered 메소드가 동작하게 되고 이 메소드에서 이벤트 종료를 받을 수 있는데 해당 이벤트가 IdleStateEvent의 참조 변수인지 확인 후 처리를 하시면 됩니다.

 

N초간 응답이 없어서 N번 전송 후 다음메시지로 넘기는 gif

 

IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds)

해당 핸들러 파라미터 옵션에 대하여 간단하게 알아보겠습니다.

 

각 파라미터 순서대로이며 해당 기능을 비활성하기 위해서는 '0'을 입력합니다.

1. readerIdleTimeSeconds : 읽기(수신) 시간으로 해당 시간만큼 동작하지 않으면 트리거가 발생합니다.

2. writerIdleTimeSeconds : 쓰기(발송) 시간으로 해당 시간만큼 동작하지 않으면 트리거가 발생합니다.

3. allIdleTimeSeconds : 읽기, 쓰기 모두를 지정하며 해당 시간만큼 동작하지 않으면 트리거가 발생합니다.

 

 

 

 

아래는 완성된 총 NettyClient 튜토리얼 소스입니다.

 


 

NettyClientAction.java

package com.psw.socket.nettyPrj.netty.client;

public interface NettyClientAction {
	public void close(NettyClientHandler handler);
	public void receive(NettyClientHandler handler);
}

 

NettyClient.java

package com.psw.socket.nettyPrj.netty.client;

import java.net.InetSocketAddress;
import java.net.SocketAddress;

import org.apache.log4j.Logger;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;

/**
 * NettyClient
 * @author psw
 */
public class NettyClient {
	private static final Logger logger = Logger.getLogger(NettyClient.class);
	
	private Bootstrap bs = new Bootstrap();
	private SocketAddress addr_;
	private Channel channel_;
	private String[] msgArr;
	private int idx;
	private int fail_cnt = 0;
	private final int FAIL_COUNT_LIMIT = 3;
	private NioEventLoopGroup group;
	
	public NettyClient(SocketAddress addr, String[] msgArr) {
		this.addr_ = addr;
		this.msgArr = msgArr;
	}
	
	public NettyClient(String host, int port, String[] msgArr) {
		this(new InetSocketAddress(host, port), msgArr);
	}
	
	//실제로 동작시킬 메소드 Bootstrap 연결 옵션 설정 및 연결 처리
	public void run() {
		if(this.addr_ == null) {
			logger.error("주소 정보가 없습니다.");
		}else if(this.msgArr == null || this.msgArr.length == 0) {
			logger.error("보낼 메시지가 없습니다.");
		}
		
		group = new NioEventLoopGroup(3);
		bs.group(group)
		.channel(NioSocketChannel.class)
		.option(ChannelOption.SO_KEEPALIVE, true);
		
		doConnect();
	}
	
	private void doConnect() {
		handlerSet();
		
		bs.connect(addr_).addListener(new ChannelFutureListener() {
			public void operationComplete(ChannelFuture future) throws Exception {
				if(future.isSuccess()) {
					logger.info("연결 성공");
					logger.info(addr_ + " connect()");
					channel_ = future.channel();
				}else {
					future.channel().close(); //실패하면 기존 연결을 종료하고
					if(FAIL_COUNT_LIMIT > ++fail_cnt) {
						logger.info("연결 실패 " + fail_cnt + "/" + FAIL_COUNT_LIMIT);
						bs.connect(addr_).addListener(this); //재연결 처리를 한다.
					}else {
						logger.info(FAIL_COUNT_LIMIT + "회 연결 초과");
						bs.group().shutdownGracefully(); //eventLoop에 등록된 Thread를 종료 처리한다.
					}
				}
			}
		});
	}
	
	private void handlerSet() {
		if(bs != null) {
			bs.handler(new ChannelInitializer<SocketChannel>() {
				@Override
				protected void initChannel(SocketChannel ch) throws Exception {
					ch.pipeline().addLast("idleStateHandler", new IdleStateHandler(5, 0, 0));
					
					NettyClientHandler handler = new NettyClientHandler(msgArr[idx]);
					handler.setCallBackClientHandler(new NettyClientAction() {

						public void close(NettyClientHandler handler) {
							//종료 처리 후 더 보낼게 존재한다면 기존 옵션으로 재 연결처리를 하는 콜백 메소드
							logger.info("===== 서버가 응답이 없음 강제 종료 처리 =====");
							closeAndContinue();
						}

						public void receive(NettyClientHandler handler) {
							//응답 받은 메시지 콜백 메소드
							String receiveMsg = handler.getReceiveMsg();
							logger.info("callBack receive : "+ receiveMsg);
							closeAndContinue();
						}
						
					});
					
					ch.pipeline().addLast("clientHandler", handler);
				}
			});
		}
	}
	
	private void closeAndContinue() {
		try {
			channel_.close().sync(); //현재의 채널을 일단 닫는다.
			if(msgArr.length > ++idx) { //보낼 메시지가 남았으면 재연결 처리
				doConnect(); 
			}else { //보낼 메시지가 없다면 종료
				bs.group().shutdownGracefully(); //eventLoop에 등록된 Thread를 종료 처리한다.
			}
			
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

 

NettyClientHandler.java

package com.psw.socket.nettyPrj.netty.client;

import org.apache.log4j.Logger;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;

public class NettyClientHandler extends ChannelInboundHandlerAdapter{
	
	private static final Logger logger = Logger.getLogger(NettyClientHandler.class);
	
	private String sendMsg;
	private int lostCnt_ = 0;
	private final int LIMIT_COUNT = 3;
	
	public NettyClientHandler(String msg) {
		this.sendMsg = msg;
	}
	
	private NettyClientAction action_;
	public void setCallBackClientHandler(NettyClientAction action) {
		this.action_ = action;
	}
	
	private String receiveMsg;
	public String getReceiveMsg() {
		return this.receiveMsg;
	}
	
	@Override
	public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
	}
	
	@Override
	public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
	}
	
	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		logger.info("채널이 메시지 발송할 준비가 됨.");
		msgSend(ctx);
	}
	
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) {
		logger.info("메시지를 받는 메소드.");
		ByteBuf buf = (ByteBuf)msg;
		int n = buf.readableBytes();
		if( n > 0 ) {
			byte[] b = new byte[n];
			buf.readBytes(b);
			//수신메시지 출력
			this.receiveMsg = new String( b );
			logger.info("handler 수신된 메시지 >" + this.receiveMsg);
			
			if(this.action_ != null) {
				action_.receive(NettyClientHandler.this);
			}
		}
	}
	
	@Override
	public void channelReadComplete(ChannelHandlerContext ctx) {
		logger.info("메시지를 받는 동작이 끝나면 동작하는 메소드.");
	}
	
	@Override
	public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
		if(evt instanceof IdleStateEvent) {
			IdleStateEvent e = (IdleStateEvent) evt;
			if(e.state() == IdleState.READER_IDLE) {
				if(LIMIT_COUNT > ++lostCnt_) { //n번을 초과하지 않았기때문에 메시지 재발송
					msgSend(ctx);
				}else { //대기시간 n초를 n번 초과하여 서버와 연결된 채널을 끊음
					if(this.action_ != null) {
						action_.close(NettyClientHandler.this);
					}
				}
			}
		}
	}
	
	private void msgSend(ChannelHandlerContext ctx) {
		ByteBuf messageBuffer = Unpooled.buffer();
		messageBuffer.writeBytes(sendMsg.getBytes());
		ctx.writeAndFlush( messageBuffer ); //메시지를 발송하고 flush처리
		logger.info("발송 메시지 >" + sendMsg);
	}
}

 

App.java

package com.psw.socket.nettyPrj;

import org.apache.log4j.Logger;

import com.psw.socket.nettyPrj.netty.client.NettyClient;

public class App {
	private static final Logger logger = Logger.getLogger(App.class);
	
	public static void main( String[] args ){
		logger.info("======= Netty Client Test =======");
		String host = "127.0.0.1";
		int port = 15510;
		String[] msgArr = {"hello world\n", "hello world2\n", "hello world3\n"};
		
		new NettyClient(host, port, msgArr).run();
	}
}

 

동작 결과

총 동작 테스트

 

 

처음 네티를 적용해보면서 경험해본 기술들을 정리해보았습니다. 잘 못 된 부분이 있다면 피드백 부탁드립니다. :D

반응형