Spring Boot-实现Apache ActiveMQ消息中间件

LeopoldPete 发布于1年前
0 条问题

前言

在分布式系统中消息通信技术主要包括以下几种:

  • RPC(Remote Procedure Call Protocol):一般是C/S方式,同步的,跨语言跨平台,面向过程。
  • CORBA(Common Object Request Broker Architecture):CORBA从概念上扩展了RPC。面向对象的,企业级的(面向对象中间件还有DCOM)。
  • RMI(Remote Method Invocation):面向对象方式的 Java RPC。
  • WebService:基于Web,C/S或B/S,跨系统跨平台跨网络。多为同步调用, 实时性要求较高
  • MOM(Message oriented Middleware):面向消息中间件,主要适用于消息通道、消息总线、消息路由和发布/订阅的场景。目前主流标准有JMS(Java Message Service)、AMQP(Advanced Message Queuing Protocol)和STOMP(Streaming Text Oriented Messaging Protocol)。其中JMS是Java平台上的面向接口的消息规范,是一套API标准,并有考虑异构系统。AMQP是一个面向协议的,跟语言平台无关的消息传递应用层协议规范。STOMP是流文本定向消息协议,是一种为MOM设计的简单文本协议,在使用websocket通信时可以使用该协议来中继消息中间件功能。AMQP和STOMP都是跟http处于同一层的协议,在 AMQP 模型中,消息的 producer 将 Message 发送给 Exchange,Exchange 负责交换/路由,将消息正确地转发给相应的 Queue。消息的 Consumer 从 Queue 中读取消息。

这些通信方式中MOM消息中间件一般是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,高可用,可伸缩和最终一致性架构,是大型分布式系统不可缺少的中间件。目前在生产环境,使用较多的消息队列有ActiveMQRabbitMQZeroMQKafkaRocketMQ 等,本文介绍其中的一种ActiveMQ消息中间件来展开消息中间件的使用过程。

What-什么是ActiveMQ

ActiveMQ是一种开源的,实现了JMS规范的,面向消息(MOM)的中间件,为应用程序提供高效的、可扩展的、稳定的和安全的企业级消息通信。它可以部署于代理模式和P2P模式。完全支持JMS1.1和J2EE 1.4规范。跨平台的,多种语言和协议编写客户端,Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire, Stomp REST, WS Notification, XMPP, AMQP。如需配置ActiveMQ则需要在目标机器上安装Java环境。支持集群,同等网络,自动检测,TCP,SSL,广播,持久化,XA,多个消息也可以组成原子事务。
授权协议: Apache
开发语言: Java
操作系统: 跨平台

Why-为什么要用ActiveMQ/消息中间件

要理解这个问题得从一个实际项目业务说起,举个例子。

业务场景:某财务系统A需要给某甲方公司开发合并报表业务,其中A系统的合并报表业务的基础是需要凭证作为基准源数据(A系统不生产凭证,只是凭证的搬运工)。但是甲方公司历史已存在一套凭证系统B专门做凭证业务(B系统历史凭证数量大概1000万,同时每天会产生2万数量的新凭证),外加合并报表业务处理的实时性不是非常严格,一般一个月出一次报表即可。因此就衍生出如下业务需求。
业务需求描述:B系统需要把历史存在的凭证数据和每天新产生的凭证都需要推送给A系统,然后A系统根据传过来的凭证做凭证的后续合并报表业务处理。
系统方案设计:为了解决该业务需求,A系统按约定开发了一套基于WebService技术的接口服务来专门负责接收B系统的凭证数据,A系统接收凭证后需要做凭证数据合法性校验、重复数据校验、保存等及一系列业务处理。
初步处理方案:A系统针对每个B系统传来的接口请求线程的处理都包括凭证校验、保存等处理都采用同步通信处理完后即时反馈处理结果给B系统的。这种方式的处理造成单个请求线程处理耗时较长,平均算下来基本1条数据的处理大致得花费1秒时间。而这种处理方式生产过程中存在非常大的性能问题,在B系统海量数据推送的情况下,按1秒1条的话,B系统当天推送的凭证A系统当天来不及处理完,这样就会堆积在第二天继续处理,但是B系统第二天又会传新的一批大量凭证,这样就导致A系统处理的还是第一天B系统传输的凭证数据,第二天的数据一直压着消费不了,时间一久系统的业务可用性大大降低。所以该方案需要废弃优化。
优化处理方案:B系统每个接口请求线程同步推送值A接口服务中,A系统接收后即时将消息转发至消息中间件ActiveMQ的队列中然后结束本次接口调用。系统A在内部单独提供消息中间件的多个消费者来异步消费中间件队列里面的凭证数据,每消费一条消息就校验保存操作,其中不满足业务的记录将错误系统通过与B系统约定的另一个接口将处理成功、处理错误信息反馈结果推送到B接口服务中,保存后的数据再通过计划任务或者手工触发来异步做合并业务处理,这样再经过一系列细节优化后,最终实现了业务处理的高可用性。

通过上述例子我们可以看出消息中间件在处理业务起到了很大的作用,其作用取决于其拥有的特点,对于传统的通信一般分为同步通信(比如:消息中间件)和异步通信(比如:RPC)。对于如今的分布式系统,消息队列已经演变为独立的消息中间件产品,相比于RPC同步通信的方式来说有几个明显的优势:

  • 低耦合:不管是程序还是模块之间,使用消息中间件进行间接通信。
  • 消息的顺序性:消息队列可以保证消息的先进先出。
  • 消息可靠传输:持久化的存储使得消息只有在被消费之后才会删除。
  • 异步通信能力:相对于RPC来说,异步通信使得生产者和消费者得以充分执行自己的逻辑而无需等待。
  • 缓冲/堆积能力:消息中间件像是一个巨大的蓄水池,将高峰期大量的请求存储下来慢慢交给后台进行处理,对于秒杀业务来说尤为重要,比如kafka具有强大的堆积能力可以用于日志通信中间件。

但是异步通信也存在程序设计和编程方面的复杂,同时对于实时性要求较高的业务也不能采用异步通信,所以要根据业务具体分析。

消息中间件ActiveMQ的JMS支持两种消息传送模型:点对点消息通信模型发布订阅模型

  • 点对点(PTP)消息通信模型:也可称之为队列Queue模式,特定的一条消息只能被一个消费者消费。生产者将消息发送到指定的Queue当中,Broker(中间件)针对消息是否需要持久化进行持久化存储后通知消费者进行处理,消费者处理完毕后发送一个回执(Acknowledge)给Broker,Broker认为该消息已被正常消费,于是从持久化存储中删除该条消息,回执的发送逻辑内嵌在MQ的API中,无需主动调用。消费者通常可以通过两种方式获取新消息:Push和Pull。Push方式:由ActiveMQ收到消息后主动调用消费者的新消息通知接口,需要消耗ActiveMQ宝贵的线程资源,同时消费者只能被动等待消息通知。Pull方式:由消费者轮询调用 ActiveMQ API 去获取消息,不消耗ActiveMQ 线程,消费者更加主动,虽然消费者的处理逻辑变得稍稍复杂。两种方式的根本区别在于线程消耗问题,由于ActiveMQ 的线程资源相对客户端更加宝贵,Push方式会占用ActiveMQ 过多的线程从而难以适应高并发的消息场景。同时当某一消费者离线一段时间再次上线后,大量积压消息处理会消耗大量ActiveMQ 线程从而拖累其它消费者的消息处理,所以Pull方式相对来说更好(Kafka消息中间件已经抛弃了PUSH模式,全面拥抱PULL模式)。
  • 发布/订阅模式(Pub/Sub):也可称之为主题Topic模式,特定的一条消息可以被多个消费者所接收,只要消费者订阅了某个主题。消息生产者(发布者)将消息发送到某个称为主题(Topic)的虚拟通道中,Topic可以被多个消费者订阅,因此该模式类似于广播的方式。发布/订阅模式采用PUSH的方式传送消息,Subscriber只需保持在线即可。Subscriber分为临时性的和持久性的,当订阅者离线时,ActiveMQ 会为持久性的Sub持久化消息,当Sub恢复时会重新收到消息。但是既然采用Pub/Sub模式就表明允许部分消费者接收不到消息,所以通常会采用临时性的Subscriber而不是持久性的。

两种模式选择可在具体业务场景下选择合适的模式来开发,业务要求实时性非常高的不建议使用消息中间件,一切以实际业务场景出发,避免乱使用造成数据紊乱引发骚动哈哈。

How-如何使用ActiveMQ

ActiveMQ依赖于Java,所以需要先安装JDK,然后在安装ActiveMQ消息中间件,然后编写代码实现。

JDK10安装

下载JDK10

官网地址:http://www.oracle.com/technet...

安装jdk10

将下载的jdk-10_linux-x64_bin.tar.gz文件拷贝到/usr/local/src目录.

解压文件到/usr/local/bin,先进入解压目录下cd /usr/local/bin,然后使用命令 tar -zxvf /usr/local/src/jdk-10_linux-x64_bin.tar.gz 执行解压。

配置JAVA_HOME环境变量。
使用命令:vim /etc/profile 编辑配置文件,在vim中插入数据按键盘上的i或者insert,然后添加如下JAVA_HOME内容:

export JAVA_HOME=/usr/local/bin/jdk-10
export CLASSPATH=$JAVA_HOME/lib/
export PATH=$PATH:$JAVA_HOME/bin
export PATH JAVA_HOME CLASSPATH

按esc退出insert模式,再按:输入wq,保存并且退出文件编辑。

重启服务器或者执行配置立即生效命令,刷新配置文件生效命令(我用的这个方法):source /etc/profile ,或者执行重启服务器命令:sudo shutdown -r now ,然后执行java -version验证安装是否成功。

出现版本号即表示安装成功,JDK安装成功后接下来安装ActiveMQ。

ActiveMQ安装

下载ActiveMQ

官网地址: http://activemq.apache.org/

安装ActiveMQ

将下载的apache-activemq-5.15.3-bin.tar.gz文件拷贝到/usr/local/src目录。

解压文件到/usr/local/bin ,先进入解压目录下cd /usr/local/bin,然后使用命令 tar -zxvf /usr/local/src/apache-activemq-5.15.3-bin.tar.gz 执行解压。

然后进入cd /usr/local/bin/apache-activemq-5.15.3/bin目录下执行使用后端启动模式命令: ./activemq start 运行activemq,执行后使用命令 ps -ef | grep -i activemq 查看进程。

启动后可以看到activemq默认使用了8161(监控平台)和61616(tcp通信服务)端口。

默认端口配置可以自己修改,修改ActiveMQ的tcp通信服务端口,修改服务地址和端口:打开conf/activemq.xml文件,找到如下部分,修改红色部分即可: 

修改监控平台地址和端口:打开conf/jetty.xml文件,找到如下部分,修改红色部分即可: 

进入目录cd /etc/sysconfig 执行命令 vim iptables 开启8161, 61616防火墙端口。

然后刷新服务。

由于我使用的是阿里云的服务器部署的,所以需要登陆ECS云服务器上配置安全组策略配置8161, 61616入口规则才能在外网访问。

配置完后使用浏览器然后访问http://47.93.63.64:8161/admin/进入管理平台,默认登陆用户名和密码都为admin。

配置了域名解析后也可以使用域名访问.

经过以上部署后准备单点服务的简单版工作已做完,下面编写代码实现消息的生产和消费功能。

ActiveMQ代码开发

原始的 ActiveMQ API 处理流程图:

由于Spring Boot提供了大量的自动配置和注解功能,已经把这部分代码封装好了,所以开发起来很方便。

工程添加ActiveMQ依赖

Gradle构建工具依赖添加:

// 集成Active MQ消息中间件
compile group: 'org.springframework.boot', name: 'spring-boot-starter-activemq', version: '2.0.0.RELEASE'

  • 编写application.properties属性配置
##################################---Active MQ消息中间件---##############################################
spring.activemq.broker-url=tcp://www.javalsj.com:61616
spring.activemq.user=admin
spring.activemq.password=admin
  • 编写ActiveMQConfiguration配置类代码:
package com.javalsj.blog.activemq;

import javax.jms.ConnectionFactory;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.config.SimpleJmsListenerContainerFactory;

/**
 * @description ActiveMQ消息队列配置类
 * @author WANGJIHONG
 * @date 2018年3月25日 下午10:52:26 
 * @Copyright 版权所有 (c) www.javalsj.com
 * @memo 无备注说明
 */
@Configuration
public class ActiveMQConfiguration {
    
    /** 
     * 在Queue模式中,对消息的监听需要对containerFactory进行配置
     */ 
    @Bean(ActiveMQQueueConst.BEAN_NAME_JMSLISTENERCONTAINERFACTORY)
    public JmsListenerContainerFactory<?> queueJmsListenerContainerFactory(ConnectionFactory connectionFactory){
        SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setPubSubDomain(false);
        return factory;
    }
    
    /** 
     * 在Topic模式中,对消息的监听需要对containerFactory进行配置
     */ 
    @Bean(ActiveMQTopicConst.BEAN_NAME_JMSLISTENERCONTAINERFACTORY)
    public JmsListenerContainerFactory<?> topicJmsListenerContainerFactory(ConnectionFactory connectionFactory){
        SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setPubSubDomain(true);
        return factory;
    }
}
  • 编写ActiveMQQueueConst队列常量类代码:
package com.javalsj.blog.activemq;

/**
 * @description ActiveMQ队列常量
 * @author WANGJIHONG
 * @date 2018年3月25日 下午10:59:47
 * @Copyright 版权所有 (c) www.javalsj.com
 * @memo 无备注说明
 */
public class ActiveMQQueueConst {

    /** 
     * 在Queue模式中,对消息的监听需要对containerFactory进行配置,工厂标识
     */ 
    public static final String BEAN_NAME_JMSLISTENERCONTAINERFACTORY = "queueJmsListenerContainerFactory";    
    
    /**
     * 队列消息标识_WebSocket的Java老司机聊天室
     */
    public static final String QUEUE_NAME_WEBSOCKET_CHATROOM_JAVALSJ = "queue.websocket.chatroom.javalsj";

}
  • 编写ActiveMQQueueProducer队列生产者代码:
package com.javalsj.blog.activemq;

import javax.jms.Destination;

import org.apache.activemq.command.ActiveMQQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Component;

/**
 * @description ActiveMQ消息生产者
 * @author WANGJIHONG
 * @date 2018年3月25日 下午10:57:54
 * @Copyright 版权所有 (c) www.javalsj.com
 * @memo 无备注说明
 */
@Component
public class ActiveMQQueueProducer {
    
    private final static Logger logger = LoggerFactory.getLogger(ActiveMQQueueProducer.class);

    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;
    
    /** 
     * 发送队列消息
     * @param destinationName 消息目的地标识
     * @param message 消息文本
     */ 
    public void sendMsg(String destinationName, String message) {
        logger.info("发布了一条队列{}消息{}。", destinationName, message);
        Destination destination = new ActiveMQQueue(destinationName);
        jmsMessagingTemplate.convertAndSend(destination, message);
    }
    
}
  • 编写ActiveMQQueueConsumer队列消费者代码:
package com.javalsj.blog.activemq;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

/**
 * @description ActiveMQ队列消息消费者
 * @author WANGJIHONG
 * @date 2018年3月25日 下午10:59:10
 * @Copyright 版权所有 (c) www.javalsj.com
 * @memo 无备注说明
 */
@Component
public class ActiveMQQueueConsumer {

    private final static Logger logger = LoggerFactory.getLogger(ActiveMQQueueConsumer.class);

    /**
     * WebSocket的Java老司机聊天室队列消息消费者
     */
    @JmsListener(destination = ActiveMQQueueConst.QUEUE_NAME_WEBSOCKET_CHATROOM_JAVALSJ, containerFactory = ActiveMQQueueConst.BEAN_NAME_JMSLISTENERCONTAINERFACTORY)
    public void receiveQueueWebSocketJavalsjChatroomMsg(String message) {
        logger.info("消费了一条队列{}消息{}。", ActiveMQQueueConst.QUEUE_NAME_WEBSOCKET_CHATROOM_JAVALSJ, message);
    }

}
  • 编写ActiveMQTopicConst主题消息常量类
package com.javalsj.blog.activemq;

/**
 * @description ActiveMQ主题常量 
 * @author WANGJIHONG
 * @date 2018年3月25日 下午11:24:09 
 * @Copyright 版权所有 (c) www.javalsj.com
 * @memo 无备注说明
 */
public class ActiveMQTopicConst {
    
    /** 
     * 在Topic模式中,对消息的监听需要对containerFactory进行配置,工厂标识
     */ 
    public static final String BEAN_NAME_JMSLISTENERCONTAINERFACTORY = "topicJmsListenerContainerFactory";    
    
    /**
     * 主题消息标识_WebSocket的系统公告
     */
    public static final String TOPIC_NAME_WEBSOCKET_SYSTEM_NOTICE = "topic.websocket.system.notice";
}
  • 编写ActiveMQTopicPublisher主题消息发布者代码:
package com.javalsj.blog.activemq;

import javax.jms.Destination;

import org.apache.activemq.command.ActiveMQTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Component;

/**
 * @description ActiveMQ主题消息发布者
 * @author WANGJIHONG
 * @date 2018年3月25日 下午11:19:45 
 * @Copyright 版权所有 (c) www.javalsj.com
 * @memo 无备注说明
 */
@Component
public class ActiveMQTopicPublisher {
    private final static Logger logger = LoggerFactory.getLogger(ActiveMQTopicPublisher.class);

    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;
    
    /** 
     * 发布主题消息
     */
    public void publishMsg(String destinationName, String message) {
        logger.info("发布了一条主题{}消息{}。", destinationName, message);
        Destination destination = new ActiveMQTopic(destinationName);
        jmsMessagingTemplate.convertAndSend(destination, message);
    }
}
  • 编写ActiveMQTopicSubscriber主题消息订阅者代码:
package com.javalsj.blog.activemq;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

/**
 * @description ActiveMQ主题消息订阅者
 * @author WANGJIHONG
 * @date 2018年3月25日 下午11:22:50
 * @Copyright 版权所有 (c) www.javalsj.com
 * @memo 无备注说明
 */
@Component
public class ActiveMQTopicSubscriber {

    private final static Logger logger = LoggerFactory.getLogger(ActiveMQTopicSubscriber.class);

    @JmsListener(destination = ActiveMQTopicConst.TOPIC_NAME_WEBSOCKET_SYSTEM_NOTICE, containerFactory = ActiveMQTopicConst.BEAN_NAME_JMSLISTENERCONTAINERFACTORY)
    public void subscribeTopicWebsocketSystemNoticeMsg(String message) {
        logger.info("消费了一条主题{}消息{}。", ActiveMQTopicConst.TOPIC_NAME_WEBSOCKET_SYSTEM_NOTICE, message);
    }
}
  • 编写ActiveMQTest测试类
package com.javalsj.blog.activemq;

import java.time.Instant;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

/**
 * @description 测试
 * @author WANGJIHONG
 * @date 2018年3月25日 下午11:41:03
 * @Copyright 版权所有 (c) www.javalsj.com
 * @memo 无备注说明 
 */
@Component
@EnableScheduling
public class ActiveMQTest {

    @Autowired
    private ActiveMQQueueProducer activeMQQueueProducer;

    @Autowired
    private ActiveMQTopicPublisher activeMQTopicPublisher;

    @Scheduled(fixedRate = 10000, initialDelay = 3000)
    public void test() {
        activeMQQueueProducer.sendMsg(ActiveMQQueueConst.QUEUE_NAME_WEBSOCKET_CHATROOM_JAVALSJ,
                "队列message" + Instant.now().toString());
        activeMQTopicPublisher.publishMsg(ActiveMQTopicConst.TOPIC_NAME_WEBSOCKET_SYSTEM_NOTICE,
                "主题message" + Instant.now().toString());
    }

}

重启服务验证功能使用情况,至此简单的ActiveMQ实例demo已完成。

总结

本文只是简单的介绍了下Apache ActiveMQ消息中间件简单使用,实际生产环境开发比这要复杂些,比如为了高可用性一般都需要把 ActiveMQ 部署为集群,其中消费过程也需要做业务上的去重等等一系列细节处理,感兴趣的可以网上查查资料了解下高可用消息中间件的部署和使用。也可以了解下其他类型的消息中间件产品,每种都有合适的适用场景。写了两个小时,困了,帮我关下灯,嘿嘿。

查看原文: Spring Boot-实现Apache ActiveMQ消息中间件

  • smallpeacock
  • heavypanda
  • greenduck
需要 登录 后回复方可回复, 如果你还没有账号你可以 注册 一个帐号。