中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Socket結合線程池怎么實現客戶端和服務端通信demo

發布時間:2022-03-11 09:06:55 來源:億速云 閱讀:138 作者:iii 欄目:開發技術

本篇內容主要講解“Socket結合線程池怎么實現客戶端和服務端通信demo”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“Socket結合線程池怎么實現客戶端和服務端通信demo”吧!

    1、要求

    可以使用 Socket 和 ServiceSocket 以及其它 API;

    寫一個客戶端和服務端之間 TCP 通信的例子;

    服務端處理任務需要異步處理;

    因為服務端處理能力很弱,只能同時處理 5 個請求,當第六個請求到達服務器時,需要服務器返回明確的錯誤信息:服務器太忙了,請稍后重試~。

    需求比較簡單,唯一復雜的地方在于第四點,我們需要對客戶端的請求量進行控制,首先我們需要確認的是,我們是無法控制客戶端發送的請求數的,所以我們只能從服務端進行改造,比如從服務端進行限流。

    有的同學可能很快想到,我們應該使用 ServerSocket 的 backlog 的屬性,把其設置成 5,但我們在上一章中說到 backlog 并不能準確代表限制的客戶端連接數,而且我們還要求服務端返回具體的錯誤信息,即使 backlog 生效,也只會返回固定的錯誤信息,不是我們定制的錯誤信息。

    我們好好想想,線程池似乎可以做這個事情,我們可以把線程池的 coreSize 和 maxSize 都設置成 4,把隊列大小設置成 1,這樣服務端每次收到請求后,會先判斷一下線程池中的隊列有沒有數據,如果有的話,說明當前服務器已經馬上就要處理第五個請求了,當前請求就是第六個請求,應該被拒絕。

    正好線程池的加入也可以滿足第三點,服務端的任務可以異步執行。

    2、客戶端代碼

    客戶端的代碼比較簡單,直接向服務器請求數據即可,代碼如下:

    public class SocketClient {
      private static final Integer SIZE = 1024;
      private static final ThreadPoolExecutor socketPoll = new ThreadPoolExecutor(50, 50,
                                                                                   365L,
                                                                                   TimeUnit.DAYS,
                                                                                   new LinkedBlockingQueue<>(400));
      @Test
      public void test() throws InterruptedException {
        // 模擬客戶端同時向服務端發送 6 條消息
        for (int i = 0; i < 6; i++) {
          socketPoll.submit(() -> {
            send("localhost", 7007, "nihao");
          });
        }
        Thread.sleep(1000000000);
      }
      /**
       * 發送tcp
       *
       * @param domainName 域名
       * @param port       端口
       * @param content    發送內容
       */
      public static String send(String domainName, int port, String content) {
        log.info("客戶端開始運行");
        Socket socket = null;
        OutputStream outputStream = null;
        InputStreamReader isr = null;
        BufferedReader br = null;
        InputStream is = null;
        StringBuffer response = null;
        try {
          if (StringUtils.isBlank(domainName)) {
            return null;
          }
          // 無參構造器初始化 Socket,默認底層協議是 TCP
          socket = new Socket();
          socket.setReuseAddress(true);
          // 客戶端準備連接服務端,設置超時時間 10 秒
          socket.connect(new InetSocketAddress(domainName, port), 10000);
          log.info("TCPClient 成功和服務端建立連接");
          // 準備發送消息給服務端
          outputStream = socket.getOutputStream();
          // 設置 UTF 編碼,防止亂碼
          byte[] bytes = content.getBytes(Charset.forName("UTF-8"));
          // 輸出字節碼
          segmentWrite(bytes, outputStream);
          // 關閉輸出
          socket.shutdownOutput();
          log.info("TCPClient 發送內容為 {}",content);
          // 等待服務端的返回
          socket.setSoTimeout(50000);//50秒還沒有得到數據,直接斷開連接
          // 得到服務端的返回流
          is = socket.getInputStream();
          isr = new InputStreamReader(is);
          br = new BufferedReader(isr);
          // 從流中讀取返回值
          response = segmentRead(br);
          // 關閉輸入流
          socket.shutdownInput();
          //關閉各種流和套接字
          close(socket, outputStream, isr, br, is);
          log.info("TCPClient 接受到服務端返回的內容為 {}",response);
          return response.toString();
        } catch (ConnectException e) {
          log.error("TCPClient-send socket連接失敗", e);
          throw new RuntimeException("socket連接失敗");
        } catch (Exception e) {
          log.error("TCPClient-send unkown errror", e);
          throw new RuntimeException("socket連接失敗");
        } finally {
          try {
            close(socket, outputStream, isr, br, is);
          } catch (Exception e) {
            // do nothing
          }
        }
      }
      /**
       * 關閉各種流
       *
       * @param socket
       * @param outputStream
       * @param isr
       * @param br
       * @param is
       * @throws IOException
       */
      public static void close(Socket socket, OutputStream outputStream, InputStreamReader isr,
                               BufferedReader br, InputStream is) throws IOException {
        if (null != socket && !socket.isClosed()) {
          try {
            socket.shutdownOutput();
          } catch (Exception e) {
          }
          try {
            socket.shutdownInput();
          } catch (Exception e) {
          }
          try {
            socket.close();
          } catch (Exception e) {
          }
        }
        if (null != outputStream) {
          outputStream.close();
        }
        if (null != br) {
          br.close();
        }
        if (null != isr) {
          isr.close();
        }
        if (null != is) {
          is.close();
        }
      }
      /**
       * 分段讀
       *
       * @param br
       * @throws IOException
       */
      public static StringBuffer segmentRead(BufferedReader br) throws IOException {
        StringBuffer sb = new StringBuffer();
        String line;
        while ((line = br.readLine()) != null) {
          sb.append(line);
        }
        return sb;
      }
      /**
       * 分段寫
       *
       * @param bytes
       * @param outputStream
       * @throws IOException
       */
      public static void segmentWrite(byte[] bytes, OutputStream outputStream) throws IOException {
        int length = bytes.length;
        int start, end = 0;
        for (int i = 0; end != bytes.length; i++) {
          start = i == 0 ? 0 : i * SIZE;
          end = length > SIZE ? start + SIZE : bytes.length;
          length -= SIZE;
          outputStream.write(bytes, start, end - start);
          outputStream.flush();
        }
      }
    }

    客戶端代碼中我們也用到了線程池,主要是為了并發模擬客戶端一次性發送 6 個請求,按照預期服務端在處理第六個請求的時候,會返回特定的錯誤信息給客戶端。

    以上代碼主要方法是 send 方法,主要處理像服務端發送數據,并處理服務端的響應。

    3、服務端代碼

    服務端的邏輯分成兩個部分,第一部分是控制客戶端的請求個數,當超過服務端的能力時,拒絕新的請求,當服務端能力可響應時,放入新的請求,第二部分是服務端任務的執行邏輯。

    3.1、對客戶端請求進行控制

    public class SocketServiceStart {
      /**
       * 服務端的線程池,兩個作用
       * 1:讓服務端的任務可以異步執行
       * 2:管理可同時處理的服務端的請求數
       */
      private static final ThreadPoolExecutor collectPoll = new ThreadPoolExecutor(4, 4,
                                                                                   365L,
                                                                                   TimeUnit.DAYS,
                                                                                   new LinkedBlockingQueue<>(
                                                                                       1));
      @Test
      public void test(){
        start();
      }
      /**
       * 啟動服務端
       */
      public static final void start() {
        log.info("SocketServiceStart 服務端開始啟動");
        try {
          // backlog  serviceSocket處理阻塞時,客戶端最大的可創建連接數,超過客戶端連接不上
          // 當線程池能力處理滿了之后,我們希望盡量阻塞客戶端的連接
    //      ServerSocket serverSocket = new ServerSocket(7007,1,null);
          // 初始化服務端
          ServerSocket serverSocket = new ServerSocket();
          serverSocket.setReuseAddress(true);
    //      serverSocket.bind(new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), 80));
          serverSocket.bind(new InetSocketAddress("localhost", 7007));
          log.info("SocketServiceStart 服務端啟動成功");
          // 自旋,讓客戶端一直在取客戶端的請求,如果客戶端暫時沒有請求,會一直阻塞
          while (true) {
            // 接受客戶端的請求
            Socket socket = serverSocket.accept();
            // 如果隊列中有數據了,說明服務端已經到了并發處理的極限了,此時需要返回客戶端有意義的信息
            if (collectPoll.getQueue().size() >= 1) {
              log.info("SocketServiceStart 服務端處理能力到頂,需要控制客戶端的請求");
              //返回處理結果給客戶端
              rejectRequest(socket);
              continue;
            }
            try {
              // 異步處理客戶端提交上來的任務
              collectPoll.submit(new SocketService(socket));
            } catch (Exception e) {
              socket.close();
            }
          }
        } catch (Exception e) {
          log.error("SocketServiceStart - start error", e);
          throw new RuntimeException(e);
        } catch (Throwable e) {
          log.error("SocketServiceStart - start error", e);
          throw new RuntimeException(e);
        }
      }
    	// 返回特定的錯誤碼給客戶端
      public static void rejectRequest(Socket socket) throws IOException {
        OutputStream outputStream = null;
        try{
          outputStream = socket.getOutputStream();
          byte[] bytes = "服務器太忙了,請稍后重試~".getBytes(Charset.forName("UTF-8"));
          SocketClient.segmentWrite(bytes, outputStream);
          socket.shutdownOutput();
        }finally {
          //關閉流
          SocketClient.close(socket,outputStream,null,null,null);
        }
      }
    }

    我們使用 collectPoll.getQueue().size() >= 1 來判斷目前服務端是否已經到達處理的極限了,如果隊列中有一個任務正在排隊,說明當前服務端已經超負荷運行了,新的請求應該拒絕掉,如果隊列中沒有數據,說明服務端還可以接受新的請求。

    以上代碼注釋詳細,就不累贅說了。

    3.2、服務端任務的處理邏輯

    服務端的處理邏輯比較簡單,主要步驟是:從客戶端的 Socket 中讀取輸入,進行處理,把響應返回給客戶端。

    我們使用線程沉睡 2 秒來模擬服務端的處理邏輯,代碼如下:

    public class SocketService implements Runnable {
      private Socket socket;
      public SocketService() {
      }
      public SocketService(Socket socket) {
        this.socket = socket;
      }
      @Override
      public void run() {
        log.info("SocketService 服務端任務開始執行");
        OutputStream outputStream = null;
        InputStream is = null;
        InputStreamReader isr = null;
        BufferedReader br = null;
        try {
          //接受消息
          socket.setSoTimeout(10000);// 10秒還沒有得到數據,直接斷開連接
          is = socket.getInputStream();
          isr = new InputStreamReader(is,"UTF-8");
          br = new BufferedReader(isr);
          StringBuffer sb = SocketClient.segmentRead(br);
          socket.shutdownInput();
          log.info("SocketService accept info is {}", sb.toString());
          //服務端處理 模擬服務端處理耗時
          Thread.sleep(2000);
          String response  = sb.toString();
          //返回處理結果給客戶端
          outputStream = socket.getOutputStream();
          byte[] bytes = response.getBytes(Charset.forName("UTF-8"));
          SocketClient.segmentWrite(bytes, outputStream);
          socket.shutdownOutput();
          //關閉流
          SocketClient.close(socket,outputStream,isr,br,is);
          log.info("SocketService 服務端任務執行完成");
        } catch (IOException e) {
          log.error("SocketService IOException", e);
        } catch (Exception e) {
          log.error("SocketService Exception", e);
        } finally {
          try {
            SocketClient.close(socket,outputStream,isr,br,is);
          } catch (IOException e) {
            log.error("SocketService IOException", e);
          }
        }
      }
    }

    4、測試

    測試的時候,我們必須先啟動服務端,然后再啟動客戶端,首先我們啟動服務端,打印日志如下:

    Socket結合線程池怎么實現客戶端和服務端通信demo

    接著我們啟動客戶端,打印日志如下:

    Socket結合線程池怎么實現客戶端和服務端通信demo

    我們最后看一下服務端的運行日志: 

    Socket結合線程池怎么實現客戶端和服務端通信demo

     從以上運行結果中,我們可以看出得出的結果是符合我們預期的,服務端在請求高峰時,能夠并發處理5個請求,其余請求可以用正確的提示進行拒絕。

    到此,相信大家對“Socket結合線程池怎么實現客戶端和服務端通信demo”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!

    向AI問一下細節

    免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

    AI

    闸北区| 中卫市| 竹山县| 高平市| 文安县| 沙坪坝区| 育儿| 花莲市| 华容县| 偃师市| 保山市| 兴隆县| 仙游县| 云阳县| 陆良县| 道真| 福建省| 炎陵县| 崇左市| 丰宁| 武威市| 娱乐| 屏东县| 新竹市| 策勒县| 重庆市| 子长县| 千阳县| 都江堰市| 轮台县| 盐池县| 苍山县| 榆树市| 格尔木市| 攀枝花市| 赞皇县| 依兰县| 新津县| 乌拉特前旗| 公主岭市| 乐东|