반응형

MQTT 프로토콜은 IOT, M2M을 위한 프로토콜로 최근 IOT에서 많이 사용되는 프로토콜이라고 하여 간단하게 다뤄봤습니다.

 

Publisher와 Brocker, Subscriber 구조로 나뉘어있고 Publisher는 Topic을 발행하고 Subscriber는 특정 Topic을 감시 구독합니다. Brocker는 이를 중계하는 역할을 하며 한개의 Topic에 여러 Subscriber가 존재할 수 있는 구조를 가집니다.

 

여기서 Brocker로 ActiveMQ를 사용해보았습니다.

 

ActiveMQ는 위에서도 말했듯이 메시지 브로커로 오픈소스입니다.

 

dependency 추가를하여 사용하기 위해 Maven프로젝트로 구성하여 작성하였습니다.

 

mqttv3 - dependency추가

<dependency>
  <groupId>org.eclipse.paho</groupId>
  <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
  <version>1.2.0</version>
</dependency>

 

 

MqttService.java

package com.psw.mqtts.service;

public interface MqttService{
	public String connect(String connect_url, String clientId, String username, String password, String[] subscribe);
	public String sendMsg(String topic, String msg);
	public void disConnect();
}

interface로 연결 메시지발송 종료를 담당하는 메소드를 작성하였습니다.

MQTT.java에서 상속받아 사용할 것입니다.

 

 

MQTT.java

package com.psw.mqtts.utils;

import java.util.function.Consumer;
import java.util.function.Function;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;

import com.psw.mqtts.service.MqttService;

public class MQTT implements MqttCallback, MqttService{
	private String receiveMsg;
	private String inputMsg;
	private MqttClient mqttClient;
	
	private Consumer<MQTT> receiver;
	private Consumer<String> delivery;
	private Consumer<String> lost;
	
	final static Function<Object, Boolean> STRING_NVL = str -> str != null && !((String) str).trim().equals("") ? false : true;
	
	final static Function<Object, Boolean> STRING_ARR_NVL = str -> str != null && ((String[]) str).length > 0 ? true : false;
	
	public MQTT() {
		this.receiveMsg = null;
		this.inputMsg = null;
		this.mqttClient = null;
		this.receiver = null;
		this.delivery = null;
		this.lost = null;
	}

	public void setReceiver(Consumer<MQTT> receiver) {
		this.receiver = receiver;
	}

	
	public void setDelivery(Consumer<String> delivery) {
		this.delivery = delivery;
	}
	
	
	public void setLost(Consumer<String> lost) {
		this.lost = lost;
	}


	public String getInputMsg() {
		return inputMsg;
	}

	
	public void setInputMsg(String inputMsg) {
		this.inputMsg = inputMsg;
	}
	
	
	public String getReceiveMsg() {
		return receiveMsg;
	}
	
	
	public void setReceiveMsg(String receiveMsg) {
		this.receiveMsg = receiveMsg;
	}
	
	
	public MqttClient getMqttClient() {
		return mqttClient;
	}
	
	
	public void setMqttClient(MqttClient mqttClient) {
		this.mqttClient = mqttClient;
	}
	
	
	/**
	 * mqtt 연결처리 함수
	 * @param connect_url 연결정보
	 * @param clientId 연결 클라이언트 ID
	 * @param username 유저정보
	 * @param password 비밀번호
	 * @param subscribe 구독할 토픽 배열 //데이터가 없으면 구독을 하지 않음
	 * @return Map
	 */
	@Override
	public String connect(String connect_url, String clientId, String username, String password, String[] subscribe) {
		String res = "";
		if(STRING_NVL.apply(connect_url)) { return "연결정보를 입력해주세요.";}
		if(STRING_NVL.apply(clientId)) { return "클라이언트 ID를 입력해주세요.";}
		if(STRING_NVL.apply(username)) { return "유저정보를 입력해주세요.";}
		if(STRING_NVL.apply(password)) { return "비밀번호를 입력해주세요.";}
		
		if(mqttClient == null) {
			try {
				mqttClient = new MqttClient(connect_url, clientId);
				MqttConnectOptions connOpts = new MqttConnectOptions();
				connOpts.setUserName(username);
				connOpts.setPassword(password.toCharArray());
				
				mqttClient.connect(connOpts);
    	        mqttClient.setCallback(this);
    	        if(STRING_ARR_NVL.apply(subscribe)) {
    	        	mqttClient.subscribe(subscribe);
    	        }
    	        setMqttClient(mqttClient);
    	        res = "정상정으로 연결 되었습니다.";
			} catch (MqttException e) {
				e.printStackTrace();
				return e.getMessage();
			}
		}else {
			res = "기존에 연결되어 있는 정보가 존재합니다.";
		}
		return res;
	}
	
	
	/**
	 * 메시지 전송 함수
	 * @param topic 바라볼 토픽
	 * @param msg 전송할 메시지
	 * @return String
	 */
	@Override
	public String sendMsg(String topic, String msg) {
		String res = "";
		if(STRING_NVL.apply(topic)) { return "전송하고자 하는 Topic을 입력하세요."; }
		if(STRING_NVL.apply(msg)) { return "전송하고자 하는 메시지를 입력하세요."; }
		if(STRING_NVL.apply(topic)) { return "연결하고자 하는 토픽 "; }
		
		if(mqttClient != null && !STRING_NVL.apply(msg)) {
			if(msg.equals("exit")) {
				disConnect();
			}else {
				try {
	        		MqttMessage message = new MqttMessage(msg.getBytes());
	        		mqttClient.publish(topic, message);
	        		res = "1";
				} catch (MqttException e) {
					e.printStackTrace();
					res = e.getMessage();
				}
			}
    	}else {
    		res = "activeMq 연결을 먼저 해주세요.";
    	}
		return res;
	}
	
	@Override
	public void disConnect() {
		if(mqttClient != null) {
			try {
				mqttClient.disconnect();
				mqttClient.close();
			} catch (MqttException e) {
				e.printStackTrace();
			}
		}
	}

	@Override
	public void connectionLost(Throwable cause) {
		if(lost != null) {
			lost.accept("연결이 종료되었습니다.");
		}
	}

	@Override
	public void messageArrived(String topic, MqttMessage message) throws Exception {
		String res = "topic : " + topic + "  ||  message : " + message;
		setReceiveMsg(res);
		if(receiver != null) {
			receiver.accept(this);
		}
	}

	@Override
	public void deliveryComplete(IMqttDeliveryToken token) {
		if(delivery != null) {
			delivery.accept("메시지가 정상적으로 전달되었습니다.");
		}
	}
}

의존성 추가로 생성된 MqttCallback과 MqttService를 상속받아 작성하였습니다.

receiver, delivery, lost Consumer들은 콜백함수로 구현하여 각각 사용부분에서 메소드가 동작하면 데이터를 받아와서 동작시키도록 처리하였습니다.(connectionLost, messageArrived, deliveryComplete)

 

connect메소드에서는 subscribe배열이 존재하는지 체크하고 존재한다면 구독을 할 수 있도록 설정하였습니다.

 

sendMsg메소드에서는 입력으로 들어오는 문자열값이 exit가 들어오면 종료메소드를 실행하도록하여 연결을 종료하고 닫도록 처리하였고, 그 외에는 입력 topic으로 메시지를 전달합니다.

 

 

App.java

package com.psw.mqtts.main;

import java.util.Scanner;
import java.util.function.Consumer;

import org.eclipse.paho.client.mqttv3.MqttClient;

import com.psw.mqtts.utils.MQTT;

public class App{
	
	private static String input = "";
	
    public static void main( String[] args ){
    	String res = "";
    	MQTT mqtt = new MQTT();
    	
    	Consumer<MQTT> recv = (cons)->{
    		System.out.println("Received message // " +  cons.getReceiveMsg());
    	};
    	
    	Consumer<String> deli = (cons)->{
    		System.out.println(cons);
    	};
    	
    	Consumer<String> lost = (cons)->{
    		System.out.println(cons);
    	};
    	
    	mqtt.setReceiver(recv); //메시지가 수신되면 동작
    	mqtt.setDelivery(deli); //메시지가 정상적으로 전달되면 동작
    	mqtt.setLost(lost); //연결이 끊기면 동작
    	
    	
    	// 연결정보
    	String connect_url = "tcp://localhost:61616"; // "tcp://localhost:61616"
    	String clientId = "psw_test"; 
    	String username = "admin"; 
    	String password = "admin";
    	String[] subscribe = {"psw", "kjg"};
    	
    	//activemq 연결
    	res = mqtt.connect(connect_url, clientId, username, password, subscribe);
    	System.out.println(res); //연결 메시지 출력
    	
    	//연결 후 클라이언트 정보를 가져옴
    	MqttClient mqttClient = mqtt.getMqttClient();
    	if(mqttClient != null) { //정상적으로 접근되어 클라이언트가 비어있지 않다면 메시지 발송 진행
    		Scanner sc = new Scanner(System.in); //입력을 위해 Scanner 객체 생성
        	while(input != null && !input.equals("exit")) { //exit 문자열 입력이 들어올 때까지 동작
        		System.out.println("발송하실 메시지를 입력하세요. 연결을 종료하시려면 \"exit\"를 입력하세요");
        		input = sc.nextLine(); //입력 받기
        		mqtt.setInputMsg(input); //입력 데이터 저장
        		mqtt.sendMsg("psw", mqtt.getInputMsg()); //메시지 전송하고자 하는 topic으로 발송
        	}
        	
        	//종료가 되면 Scanner 닫기
        	try {
        		sc.close();
        	}catch(Exception e) {
        		e.printStackTrace();
        	}
    	}else {
    		System.out.println("연결 할 수 없습니다.");
    	}
    	
    	//연결 종료
    	mqtt.connectionLost(new Throwable());
    	
    	//시스템 종료
    	System.exit(0);
    }
}

사용예제입니다.

 

연결 후 메시지를 출력하고 Scanner를 통해 입력받은 문자열을 특정 topic으로 전달합니다.

subscribe로 psw, kjg를 처리하여 해당 topic에 메시지가 들어오면 messageArrived메소드가 실행되고 콜백함수로 인해 receiver가 동작하면서 구독한 메시지를 출력합니다.

반응형