中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何利用Java搭建個簡單的Netty通信

發布時間:2020-07-23 17:10:52 來源:億速云 閱讀:146 作者:小豬 欄目:編程語言

這篇文章主要講解了如何利用Java搭建個簡單的Netty通信,內容清晰明了,對此有興趣的小伙伴可以學習一下,相信大家閱讀完之后會有幫助。

前言

看過dubbo源碼的同學應該都清楚,使用dubbo協議的底層通信是使用的netty進行交互,而最近看了dubbo的Netty部分后,自己寫了個簡單的Netty通信例子。

準備

工程截圖

如何利用Java搭建個簡單的Netty通信

模塊詳解

  • rpc-common

rpc-common作為各個模塊都需使用的模塊,工程中出現的是一些通信時請求的參數以及返回的參數,還有一些序列化的工具。

  • rpc-client

rpc-client中目前只是單單的一個NettyClient啟動類。

  • rpc-server

rpc-client中目前也只是單單的一個NettyServer服務啟動類。

需要的依賴

目前所有的依賴項都出現在 rpc-common 下的 pom.xml中。

<dependencies>
  <!-- Netty -->
  <dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.10.Final</version>
  </dependency>

  <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.25</version>
  </dependency>

  <!-- Protostuff -->
  <dependency>
    <groupId>com.dyuproject.protostuff</groupId>
    <artifactId>protostuff-core</artifactId>
    <version>1.0.9</version>
  </dependency>

  <dependency>
    <groupId>com.dyuproject.protostuff</groupId>
    <artifactId>protostuff-runtime</artifactId>
    <version>1.0.9</version>
  </dependency>

  <!-- Objenesis -->
  <dependency>
    <groupId>org.objenesis</groupId>
    <artifactId>objenesis</artifactId>
    <version>2.1</version>
  </dependency>

  <!-- fastjson -->
  <dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.38</version>
  </dependency>
</dependencies>

實現

首先我們在common中先定義本次的Request和Response的基類對象。

public class Request {

  private String requestId;

  private Object parameter;

  public String getRequestId() {
    return requestId;
  }

  public void setRequestId(String requestId) {
    this.requestId = requestId;
  }

  public Object getParameter() {
    return parameter;
  }

  public void setParameter(Object parameter) {
    this.parameter = parameter;
  }
}

public class Response {

  private String requestId;

  private Object result;

  public String getRequestId() {
    return requestId;
  }

  public void setRequestId(String requestId) {
    this.requestId = requestId;
  }

  public Object getResult() {
    return result;
  }

  public void setResult(Object result) {
    this.result = result;
  }
}

使用fastJson進行本次序列化

Netty對象的序列化轉換很好懂, ByteToMessageDecoder 和 MessageToByteEncoder 分別只要繼承它們,重寫方法后,獲取到Object和Byte,各自轉換就OK。

不過如果是有要用到生產上的同學,建議不要使用 fastJson,因為它的漏洞補丁真的是太多了,可以使用google的 protostuff。

public class RpcDecoder extends ByteToMessageDecoder {

  // 目標對象類型進行解碼
  private Class<&#63;> target;

  public RpcDecoder(Class target) {
    this.target = target;
  }

  @Override
  protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    if (in.readableBytes() < 4) {  // 不夠長度丟棄
      return;
    }
    in.markReaderIndex();  // 標記一下當前的readIndex的位置
    int dataLength = in.readInt(); // 讀取傳送過來的消息的長度。ByteBuf 的readInt()方法會讓他的readIndex增加4

    if (in.readableBytes() < dataLength) { // 讀到的消息體長度如果小于我們傳送過來的消息長度,則resetReaderIndex. 這個配合markReaderIndex使用的。把readIndex重置到mark的地方
      in.resetReaderIndex();
      return;
    }
    byte[] data = new byte[dataLength];
    in.readBytes(data);

    Object obj = JSON.parseObject(data, target);  // 將byte數據轉化為我們需要的對象
    out.add(obj);
  }
}

public class RpcEncoder extends MessageToByteEncoder {

  //目標對象類型進行編碼
  private Class<&#63;> target;

  public RpcEncoder(Class target) {
    this.target = target;
  }

  @Override
  protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
    if (target.isInstance(msg)) {
      byte[] data = JSON.toJSONBytes(msg);  // 使用fastJson將對象轉換為byte
      out.writeInt(data.length); // 先將消息長度寫入,也就是消息頭
      out.writeBytes(data);  // 消息體中包含我們要發送的數據
    }
  }

}

NetyServer

public class NettyServerHandler extends ChannelInboundHandlerAdapter {

  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    Request request = (Request) msg;

    System.out.println("Client Data:" + JSON.toJSONString(request));

    Response response = new Response();
    response.setRequestId(request.getRequestId());
    response.setResult("Hello Client !");

    // client接收到信息后主動關閉掉連接
    ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
  }

  @Override
  public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    ctx.flush();
  }

  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    ctx.close();
  }
}

public class NettyServer {

  private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);

  private String ip;
  private int port;

  public NettyServer(String ip, int port) {
    this.ip = ip;
    this.port = port;
  }

  public void server() throws Exception {

    EventLoopGroup bossGroup = new NioEventLoopGroup();
    EventLoopGroup workerGroup = new NioEventLoopGroup();

    try {

      final ServerBootstrap serverBootstrap = new ServerBootstrap();

      serverBootstrap.group(bossGroup, workerGroup)
          .channel(NioServerSocketChannel.class)
          .option(ChannelOption.SO_BACKLOG, 1024)
          .option(ChannelOption.SO_SNDBUF, 32 * 1024)
          .option(ChannelOption.SO_RCVBUF, 32 * 1024)
          .option(ChannelOption.SO_KEEPALIVE, true)
          .childHandler(new ChannelInitializer<SocketChannel>() {
            protected void initChannel(SocketChannel socketChannel) throws Exception {
              socketChannel.pipeline().addLast(new RpcDecoder(Request.class))
                  .addLast(new RpcEncoder(Response.class))
                  .addLast(new NettyServerHandler());
            }
          });

      serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); // 開啟長連接

      ChannelFuture future = serverBootstrap.bind(ip, port).sync();

//      if (future.isSuccess()) {
//
//        new Register().register("/yanzhenyidai/com.yanzhenyidai.server", ip + ":" + port);
//      }

      future.channel().closeFuture().sync();
    } finally {
      bossGroup.shutdownGracefully();
      workerGroup.shutdownGracefully();
    }
  }

  public static void main(String[] args) throws Exception {
    new NettyServer("127.0.0.1", 20000).server();
  }
}

關鍵名詞:

  • EventLoopGroup
    • workerGroup
    • bossGroup
         Server端的EventLoopGroup分為兩個,一般workerGroup作為處理請求,bossGroup作為接收請求。
  • ChannelOption
    • SO_BACKLOG
    • SO_SNDBUF
    • SO_RCVBUF
    • SO_KEEPALIVE
         以上四個常量作為TCP連接中的屬性。
  • ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);

NettyServerHandler中出現的 ChannelFutureListener.CLOSE ,作為Server端主動關閉與Client端的通信,如果沒有主動Close,那么NettyClient將會一直處于阻塞狀態,得不到NettyServer的返回信息。

NettyClient

public class NettyClient extends SimpleChannelInboundHandler<Response> {

  private final String ip;
  private final int port;
  private Response response;

  public NettyClient(String ip, int port) {
    this.ip = ip;
    this.port = port;
  }

  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    ctx.close();
  }

  @Override
  protected void channelRead0(ChannelHandlerContext channelHandlerContext, Response response) throws Exception {
    this.response = response;
  }

  public Response client(Request request) throws Exception {
    EventLoopGroup group = new NioEventLoopGroup();

    try {

      // 創建并初始化 Netty 客戶端 Bootstrap 對象
      Bootstrap bootstrap = new Bootstrap();
      bootstrap.group(group);
      bootstrap.channel(NioSocketChannel.class);
      bootstrap.handler(new ChannelInitializer<SocketChannel>() {
        @Override
        public void initChannel(SocketChannel channel) throws Exception {
          ChannelPipeline pipeline = channel.pipeline();

          pipeline.addLast(new RpcDecoder(Response.class));
          pipeline.addLast(new RpcEncoder(Request.class));
          pipeline.addLast(NettyClient.this);
        }
      });
      bootstrap.option(ChannelOption.TCP_NODELAY, true);


//      String[] discover = new Discover().discover("/yanzhenyidai/com.yanzhenyidai.server").split(":");

      // 連接 RPC 服務器
      ChannelFuture future = bootstrap.connect(ip, port).sync();

      // 寫入 RPC 請求數據并關閉連接
      Channel channel = future.channel();

      channel.writeAndFlush(request).sync();
      channel.closeFuture().sync();

      return response;
    } finally {
      group.shutdownGracefully();
    }
  }

  public static void main(String[] args) throws Exception {
    Request request = new Request();
    request.setRequestId(UUID.randomUUID().toString());
    request.setParameter("Hello Server !");
    System.out.println(JSON.toJSONString(new NettyClient("127.0.0.1", 30000).client(request)));
  }
}

測試

如果以上所有內容都準備就緒,那么就可以進行調試了。

啟動順序,先啟動NettyServer,再啟動NettyClient。

總結

記得剛出來工作時,有工作很多年的同事問我了不了解Netty,當時工作太短,直說聽過Putty,現在回想起來真的挺丟人的,哈哈。&#128523;

Netty作為通信框架,如果你了解TCP,而且項目中有類似傳輸信息的需求,又不想集成HTTP或者Socket,那么Netty真的挺實用的。

看完上述內容,是不是對如何利用Java搭建個簡單的Netty通信有進一步的了解,如果還想學習更多內容,歡迎關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

黎平县| 宝坻区| 正镶白旗| 五河县| 龙南县| 汶上县| 高台县| 台湾省| 长寿区| 荆州市| 瑞安市| 玛多县| 桦甸市| 三门峡市| 金阳县| 新余市| 白河县| 海淀区| 方正县| 绥滨县| 鱼台县| 博白县| 成武县| 保康县| 繁昌县| 平山县| 贡嘎县| 布拖县| 介休市| 同仁县| 化州市| 新疆| 三台县| 太湖县| 迭部县| 监利县| 康马县| 东台市| 阆中市| 西安市| 翁牛特旗|