您好,登錄后才能下訂單哦!
本文介紹了spring集成mina實現服務端主動推送(包含心跳檢測),分享給大家,具體如下:
服務端
1.常規的spring工程集成mina時,pom.xml中需要加入如下配置:
<dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-jdk14</artifactId> <version>1.7.7</version> </dependency> <dependency> <groupId>org.apache.mina</groupId> <artifactId>mina-integration-beans</artifactId> <version>2.0.13</version> </dependency> <dependency> <groupId>org.apache.mina</groupId> <artifactId>mina-core</artifactId> <version>2.0.13</version> <type>bundle</type> <scope>compile</scope> </dependency> <dependency> <groupId>org.apache.mina</groupId> <artifactId>mina-integration-spring</artifactId> <version>1.1.7</version> </dependency>
注意此處mina-core的配置寫法。如果工程中引入上述依賴之后報錯:Missing artifact xxx bundle,則需要在pom.xml的plugins之間加入如下插件配置:
<plugin> <groupId>org.apache.felix</groupId> <artifactId>maven-bundle-plugin</artifactId> <extensions>true</extensions> </plugin>
2.Filter1:編解碼器,實現ProtocolCodecFactory解碼工廠
package com.he.server; import java.nio.charset.Charset; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolCodecFactory; import org.apache.mina.filter.codec.ProtocolDecoder; import org.apache.mina.filter.codec.ProtocolEncoder; import org.apache.mina.filter.codec.textline.LineDelimiter; import org.apache.mina.filter.codec.textline.TextLineDecoder; import org.apache.mina.filter.codec.textline.TextLineEncoder; public class MyCodeFactory implements ProtocolCodecFactory { private final TextLineEncoder encoder; private final TextLineDecoder decoder; public MyCodeFactory() { this(Charset.forName("utf-8")); } public MyCodeFactory(Charset charset) { encoder = new TextLineEncoder(charset, LineDelimiter.UNIX); decoder = new TextLineDecoder(charset, LineDelimiter.AUTO); } public ProtocolDecoder getDecoder(IoSession arg0) throws Exception { // TODO Auto-generated method stub return decoder; } public ProtocolEncoder getEncoder(IoSession arg0) throws Exception { // TODO Auto-generated method stub return encoder; } public int getEncoderMaxLineLength() { return encoder.getMaxLineLength(); } public void setEncoderMaxLineLength(int maxLineLength) { encoder.setMaxLineLength(maxLineLength); } public int getDecoderMaxLineLength() { return decoder.getMaxLineLength(); } public void setDecoderMaxLineLength(int maxLineLength) { decoder.setMaxLineLength(maxLineLength); } }
此處使用了mina自帶的TextLineEncoder編解碼器,此解碼器支持使用固定長度或者固定分隔符來區分上下兩條消息。如果要使用自定義協議,則需要自己編寫解碼器。要使用websocket,也需要重新編寫解碼器,關于mina結合websocket,jira上有一個開源項目https://issues.apache.org/jira/browse/DIRMINA-907,專門為mina編寫了支持websocket的編解碼器,親測可用。。。此部分不是本文重點,略。
3.Filter2:心跳工廠,加入心跳檢測功能需要實現KeepAliveMessageFactory:
package com.he.server; import org.apache.log4j.Logger; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.keepalive.KeepAliveMessageFactory; public class MyKeepAliveMessageFactory implements KeepAliveMessageFactory{ private final Logger LOG = Logger.getLogger(MyKeepAliveMessageFactory.class); /** 心跳包內容 */ private static final String HEARTBEATREQUEST = "1111"; private static final String HEARTBEATRESPONSE = "1112"; public Object getRequest(IoSession session) { LOG.warn("請求預設信息: " + HEARTBEATREQUEST); return HEARTBEATREQUEST; } public Object getResponse(IoSession session, Object request) { LOG.warn("響應預設信息: " + HEARTBEATRESPONSE); /** 返回預設語句 */ return HEARTBEATRESPONSE; } public boolean isRequest(IoSession session, Object message) { LOG.warn("請求心跳包信息: " + message); if (message.equals(HEARTBEATREQUEST)) return true; return false; } public boolean isResponse(IoSession session, Object message) { LOG.warn("響應心跳包信息: " + message); if(message.equals(HEARTBEATRESPONSE)) return true; return false; } }
此處我設置服務端發送的心跳包是1111,客戶端應該返回1112.
4.實現必不可少的IoHandlerAdapter,得到監聽事件處理權:
package com.he.server; import java.net.InetSocketAddress; import java.util.Collections; import java.util.HashSet; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.apache.log4j.Logger; import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.session.IdleStatus; import org.apache.mina.core.session.IoSession; public class MyHandler extends IoHandlerAdapter { //private final int IDLE = 3000;//(單位s) private final Logger LOG = Logger.getLogger(MyHandler.class); // public static Set<IoSession> sessions = Collections.synchronizedSet(new HashSet<IoSession>()); public static ConcurrentHashMap<Long, IoSession> sessionsConcurrentHashMap = new ConcurrentHashMap<Long, IoSession>(); @Override public void exceptionCaught(IoSession session, Throwable cause) throws Exception { session.closeOnFlush(); LOG.warn("session occured exception, so close it." + cause.getMessage()); } @Override public void messageReceived(IoSession session, Object message) throws Exception { String str = message.toString(); LOG.warn("客戶端" + ((InetSocketAddress) session.getRemoteAddress()).getAddress().getHostAddress() + "連接成功!"); session.setAttribute("type", message); String remoteAddress = ((InetSocketAddress) session.getRemoteAddress()).getAddress().getHostAddress(); session.setAttribute("ip", remoteAddress); LOG.warn("服務器收到的消息是:" + str); session.write("welcome by he"); } @Override public void messageSent(IoSession session, Object message) throws Exception { LOG.warn("messageSent:" + message); } @Override public void sessionCreated(IoSession session) throws Exception { LOG.warn("remote client [" + session.getRemoteAddress().toString() + "] connected."); // my Long time = System.currentTimeMillis(); session.setAttribute("id", time); sessionsConcurrentHashMap.put(time, session); } @Override public void sessionClosed(IoSession session) throws Exception { LOG.warn("sessionClosed."); session.closeOnFlush(); // my sessionsConcurrentHashMap.remove(session.getAttribute("id")); } @Override public void sessionIdle(IoSession session, IdleStatus status) throws Exception { LOG.warn("session idle, so disconnecting......"); session.closeOnFlush(); LOG.warn("disconnected."); } @Override public void sessionOpened(IoSession session) throws Exception { LOG.warn("sessionOpened."); // //session.getConfig().setIdleTime(IdleStatus.BOTH_IDLE, IDLE); } }
此處有幾點說明:
第一點:網上教程會在此處(sessionOpened()方法中)設置IDLE,IDLE表示session經過多久判定為空閑的時間,單位s,上述代碼中已經注釋掉了,因為后面在spring配置中加入心跳檢測部分時會進行IDLE的配置,已經不需要在此處進行配置了,而且如果在心跳配置部分和此處都對BOTH_IDLE模式設置了空閑時間,親測發現此處配置不生效。
第二點:關于存放session的容器,建議使用
而不是用已經注釋掉的Collections.synchronizedSet類型的set或者map,至于原因,java5中新增了ConcurrentMap接口和它的一個實現類ConcurrentHashMap,可以保證線程的足夠安全。詳細的知識你應該搜索SynchronizedMap和ConcurrentHashMap的區別,學習更加多的并發安全知識。
上述代碼中,每次在收到客戶端的消息時,我會返回一段文本:welcome by he。
有了map,主動推送就不是問題了,想推給誰,在map中找到誰就可以了。
5.完成spring的配置工作
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd"> <bean class="org.springframework.beans.factory.config.CustomEditorConfigurer"> <property name="customEditors"> <map> <entry key="java.net.SocketAddress" value="org.apache.mina.integration.beans.InetSocketAddressEditor"> </entry> </map> </property> </bean> <bean id="ioAcceptor" class="org.apache.mina.transport.socket.nio.NioSocketAcceptor" init-method="bind" destroy-method="unbind"> <!--端口號--> <property name="defaultLocalAddress" value=":8888" /> <!--綁定自己實現的handler--> <property name="handler" ref="serverHandler" /> <!--聲明過濾器的集合--> <property name="filterChainBuilder" ref="filterChainBuilder" /> <property name="reuseAddress" value="true" /> </bean> <bean id="filterChainBuilder" class="org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder"> <property name="filters"> <map> <!--mina自帶的線程池filter--> <entry key="executor" value-ref="executorFilter" /> <entry key="mdcInjectionFilter" value-ref="mdcInjectionFilter" /> <!--自己實現的編解碼器filter--> <entry key="codecFilter" value-ref="codecFilter" /> <!--日志的filter--> <entry key="loggingFilter" value-ref="loggingFilter" /> <!--心跳filter--> <entry key="keepAliveFilter" value-ref="keepAliveFilter" /> </map> </property> </bean> <!-- executorFilter多線程處理 --> <bean id="executorFilter" class="org.apache.mina.filter.executor.ExecutorFilter" /> <bean id="mdcInjectionFilter" class="org.apache.mina.filter.logging.MdcInjectionFilter"> <constructor-arg value="remoteAddress" /> </bean> <!--日志--> <bean id="loggingFilter" class="org.apache.mina.filter.logging.LoggingFilter" /> <!--編解碼--> <bean id="codecFilter" class="org.apache.mina.filter.codec.ProtocolCodecFilter"> <constructor-arg> <!--構造函數的參數傳入自己實現的對象--> <bean class="com.he.server.MyCodeFactory"></bean> </constructor-arg> </bean> <!--心跳檢測filter--> <bean id="keepAliveFilter" class="org.apache.mina.filter.keepalive.KeepAliveFilter"> <!--構造函數的第一個參數傳入自己實現的工廠--> <constructor-arg> <bean class="com.he.server.MyKeepAliveMessageFactory"></bean> </constructor-arg> <!--第二個參數需要的是IdleStatus對象,value值設置為讀寫空閑--> <constructor-arg type = "org.apache.mina.core.session.IdleStatus" value="BOTH_IDLE" > </constructor-arg> <!--心跳頻率,不設置則默認60s --> <property name="requestInterval" value="5" /> <!--心跳超時時間,不設置則默認30s --> <property name="requestTimeout" value="10" /> <!--不設置默認false--> <property name="forwardEvent" value="true" /> </bean> <!--自己實現的handler--> <bean id="serverHandler" class="com.he.server.MyHandler" /> </beans>
好了,xml中已經寫了足夠多的注釋了。說明一下關于心跳檢測中的最后一個屬性:forwardEvent,默認false,比如在心跳頻率為5s時,實際上每5s會觸發一次KeepAliveFilter中的session_idle事件,該事件中開始發送心跳包。當此參數設置為false時,對于session_idle事件不再傳遞給其他filter,如果設置為true,則會傳遞給其他filter,例如handler中的session_idle事件,此時也會被觸發。另外IdleStatus一共有三個值,點擊進源碼就能看到。
6.寫main方法啟動服務端
package com.he.server; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; public class MainTest { public static void main(String[] args) { ClassPathXmlApplicationContext ct = new ClassPathXmlApplicationContext("applicationContext.xml"); } }
run之后,端口就已經開始監聽了。此處,如果是web工程,使用tomcat之類的容器,只要在web.xml中配置了
<context-param> <param-name>contextConfigLocation</param-name> <param-value>/WEB-INF/classes/applicationContext.xml</param-value> </context-param> <listener> <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class> </listener>
則容器在啟動時就會加載spring的配置文件,端口的監聽就開始了,這樣就不需要main方法來啟動。
客戶端,本文采用兩種方式來實現客戶端
方式一:用mina結構來實現客戶端,引入mina相關jar包即可,Android也可以使用
1.先實現IoHandlerAdater得到監聽事件,類似于服務端:
package com.he.client.minaclient; import org.apache.log4j.Logger; import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.session.IdleStatus; import org.apache.mina.core.session.IoSession; public class ClientHandler extends IoHandlerAdapter{ private final Logger LOG = Logger.getLogger(ClientHandler.class); @Override public void messageReceived(IoSession session, Object message) throws Exception { // TODO Auto-generated method stub LOG.warn("客戶端收到消息:" + message); if (message.toString().equals("1111")) { //收到心跳包 LOG.warn("收到心跳包"); session.write("1112"); } } @Override public void messageSent(IoSession session, Object message) throws Exception { // TODO Auto-generated method stub super.messageSent(session, message); } @Override public void sessionClosed(IoSession session) throws Exception { // TODO Auto-generated method stub super.sessionClosed(session); } @Override public void sessionCreated(IoSession session) throws Exception { // TODO Auto-generated method stub super.sessionCreated(session); } @Override public void sessionIdle(IoSession session, IdleStatus status) throws Exception { // TODO Auto-generated method stub super.sessionIdle(session, status); } @Override public void sessionOpened(IoSession session) throws Exception { // TODO Auto-generated method stub super.sessionOpened(session); } }
2.寫main方法啟動客戶端,連接服務端:
package com.he.client.minaclient; import java.net.InetSocketAddress; import java.nio.charset.Charset; import org.apache.mina.core.future.ConnectFuture; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.codec.textline.TextLineCodecFactory; import org.apache.mina.filter.logging.LoggingFilter; import org.apache.mina.transport.socket.nio.NioSocketConnector; public class ClientTest { public static void main(String[] args) throws InterruptedException { //創建客戶端連接器. NioSocketConnector connector = new NioSocketConnector(); connector.getFilterChain().addLast("logger", new LoggingFilter()); connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("utf-8")))); //設置編碼過濾器 connector.setHandler(new ClientHandler());//設置事件處理器 ConnectFuture cf = connector.connect(new InetSocketAddress("127.0.0.1", 8888));//建立連接 cf.awaitUninterruptibly();//等待連接創建完成 cf.getSession().write("hello,測試!");//發送消息,中英文都有 //cf.getSession().closeOnFlush(); //cf.getSession().getCloseFuture().awaitUninterruptibly();//等待連接斷開 //connector.dispose(); } }
過程也是一樣的,加各種filter,綁定handler。上述代碼運行之后向服務器發送了:“hello,測試”,并且收到返回值:welcome by he。然后每隔5s,就會收到服務端的心跳包:1111。在handler的messageReceived中,確認收到心跳包之后返回1112,實現心跳應答。以上過程,每5s重復一次。
main方法中最后三行注釋掉的代碼如果打開,客戶端在發送完消息之后會主動斷開。
方式二:客戶端不借助于mina,換用java的普通socket來實現,這樣就可以換成其他任何語言:
package com.he.client; import java.io.DataInputStream; import java.io.IOException; import java.io.PrintWriter; import java.net.Socket; /** *@function:java的簡單socket連接,長連接,嘗試連續從服務器獲取消息 *@parameter: *@return: *@date:2016-6-22 下午03:43:18 *@author:he *@notice: */ public class SocketTestTwo { public static final String IP_ADDR = "127.0.0.1";// 服務器地址 public static final int PORT = 8888;// 服務器端口號 static String text = null; public static void main(String[] args) throws IOException { System.out.println("客戶端啟動..."); Socket socket = null; socket = new Socket(IP_ADDR, PORT); PrintWriter os = new PrintWriter(socket.getOutputStream()); os.println("al"); os.println("two"); os.flush(); while (true) { try { // 創建一個流套接字并將其連接到指定主機上的指定端口號 DataInputStream input = new DataInputStream(socket.getInputStream()); // 讀取服務器端數據 byte[] buffer; buffer = new byte[input.available()]; if (buffer.length != 0) { System.out.println("length=" + buffer.length); // 讀取緩沖區 input.read(buffer); // 轉換字符串 String three = new String(buffer); System.out.println("內容=" + three); if (three.equals("1111\n")) { System.out.println("發送返回心跳包"); os = new PrintWriter(socket.getOutputStream()); os.println("1112"); os.flush(); } } } catch (Exception e) { System.out.println("客戶端異常:" + e.getMessage()); os.close(); } } } }
以上代碼運行效果和前一種方式完全一樣。
但是注意此種方法和使用mina結構的客戶端中有一處不同:對于心跳包的判斷。本教程中服務端選用了mina自帶的編解碼器,通過換行符來區分上下兩條消息,也就是每一條消息后面會帶上一個換行符,所以在使用java普通的socket來連接時,判斷心跳包不再是判斷是否為“1111”,而是“1111\n”。對比mina結構的客戶端中并不需要加上換行符是因為客戶端中綁定了相同的編解碼器。
程序運行結果截圖:
服務端:
客戶端:
紅色的打印是mina自帶的打印信息,黑色的是本工程中使用的log4j打印,所以你們的工程應該配置有如下log4j的配置文件才能看到一樣的打印結果:
log4j.rootLogger=WARN,stdout log4j.appender.stdout = org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout = org.apache.log4j.PatternLayout log4j.appender.stdout.Threshold=WARN log4j.appender.stdout.layout.ConversionPattern = [%-5p] [%d{yyyy-MM-dd HH\:mm\:ss,SSS}] [%x] %c %l - %m%n
應大家需求,工程代碼終于抽空放到github了! https://github.com/smile326/minaSpring
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持億速云。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。