中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

怎么基于Kotlin實現一個簡單的TCP自定義協議

發布時間:2021-10-28 13:42:49 來源:億速云 閱讀:140 作者:iii 欄目:web開發

這篇文章主要講解了“怎么基于Kotlin實現一個簡單的TCP自定義協議”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“怎么基于Kotlin實現一個簡單的TCP自定義協議”吧!

自定義通訊協議

首先,需要設計一個通用的 TCP 網絡協議。

網絡協議結構如下

+--------------+---------------+------------+---------------+-----------+----------+      | 魔數(4)       | version(1)    |序列化方式(1) | command(1)    |數據長度(4) |數據(n)    |      +--------------+---------------+------------+---------------+-----------+----------+
  • 魔數:4字節,本項目中使用  20200803(這一天編寫的日子),為了防止該端口被意外調用,我們在收到報文后取前4個字節與魔數比對,如果不相同則直接拒絕并關閉連接。

  • 版本號:1字節,僅表示協議的版本號,便于協議升級時使用

  • 序列化方式:1字節,表示如何將 Java 對象轉化為二進制數據,以及如何反序列化。

  • 指令:1字節,表示該消息的意圖(如拍照、拍視頻、心跳、App 升級等)。最多支持 2^8 種指令。

  • 數據長度:4字節,表示該字段后數據部分的長度。最多支持 2^32 位。

  • 數據:具體數據的內容。

根據上述所設計的網絡協議,定義一個抽象類 Packet:

abstract class Packet {     var magic:Int? = MAGIC_NUMBER     // 魔數     var version:Byte = 1              // 版本號,當前協議的版本號為 1     abstract val serializeMethod:Byte // 序列化方式     abstract val command:Byte         // Watcher 跟 App 相互通訊的指令 }

有多少個指令就需要定義多少個 Packet,下面以心跳的 Packet 為例,定義一個 HeartBeatPacket:

data class HeartBeatPacket(var msg:String = "ping",                            override val serializeMethod: Byte = Serialize.JSON,                            override val command: Byte = Commands.HEART_BEAT) : Packet() { }

HeartBeatPacket 是由 TCP 客戶端發起,由 TCP 服務端接收并返回給客戶端。

每個 Packet 類都包含了該 Packet 所使用的序列化方式。

/**  * 序列化方式的常量列表  */ interface Serialize {     companion object {         const val JSON: Byte = 0     }}

每個 Packet 也包含了其對應的 command。下面是 Commands 是指令集,支持256個指令。

/**  * 指令集,支持從 -128 到 127 總共 256 個指令  */ interface Commands {     companion object {         /**          * 心跳包          */         const val HEART_BEAT: Byte = 0         /**          * 登錄(App 需要告訴 Watcher :cameraPosition 的位置)          */         const val LOGIN: Byte = 1         ......   }}

由于使用自定義的協議,必須要有對報文的 encode、decode,PacketManager 負責這些事情。

encode 時按照協議的結構進行組裝報文,同理 decode 是其逆向的過程。

/**  * 報文的管理類,對報文進行 encode、decode  */ object PacketManager {     fun encode(packet: Packet):ByteBuf = encode(ByteBufAllocator.DEFAULT, packet)     fun encode(alloc:ByteBufAllocator, packet: Packet) = encode(alloc.ioBuffer(), packet)     fun encode(buf: ByteBuf, packet: Packet): ByteBuf {         val serializer = SerializerFactory.getSerializer(packet.serializeMethod)         val bytes: ByteArray = serializer.serialize(packet)         //組裝報文:魔數(4字節)+ 版本號(1字節)+ 序列化方式(1字節)+ 指令(1字節)+ 數據長度(4字節)+ 數據(N字節)         buf.writeInt(MAGIC_NUMBER)         buf.writeByte(packet.version.toInt())         buf.writeByte(packet.serializeMethod.toInt())         buf.writeByte(packet.command.toInt())         buf.writeInt(bytes.size)         buf.writeBytes(bytes)         return buf     }     fun decode(buf:ByteBuf): Packet {         buf.skipBytes(4) // 魔數由單獨的 Handler 進行校驗         buf.skipBytes(1)         val serializationMethod = buf.readByte()         val serializer = SerializerFactory.getSerializer(serializationMethod)         val command = buf.readByte()         val clazz = PacketFactory.getPacket(command)         val length = buf.readInt()  // 數據的長度         val bytes = ByteArray(length)   // 定義需要讀取的字符數組         buf.readBytes(bytes)         return serializer.deserialize(clazz, bytes)     } }

TCP 服務端

啟動 TCP 服務的方法

fun execute() {     boss = NioEventLoopGroup()        worker = NioEventLoopGroup()        val bootstrap = ServerBootstrap()     bootstrap.group(boss, worker).channel(NioServerSocketChannel::class.java)             .option(ChannelOption.SO_BACKLOG, 100)             .childOption(ChannelOption.SO_KEEPALIVE, true)             .childOption(ChannelOption.SO_REUSEADDR, true)             .childOption(ChannelOption.TCP_NODELAY, true)             .childHandler(object : ChannelInitializer<NioSocketChannel>() {                 @Throws(Exception::class)                 override fun initChannel(nioSocketChannel: NioSocketChannel) {                     val pipeline = nioSocketChannel.pipeline()                     pipeline.addLast(ServerIdleHandler())                        pipeline.addLast(MagicNumValidator())                        pipeline.addLast(PacketCodecHandler)                        pipeline.addLast(HeartBeatHandler)                        pipeline.addLast(ResponseHandler)                    }                })        val future: ChannelFuture = bootstrap.bind(TCP_PORT)     future.addListener(object : ChannelFutureListener {         @Throws(Exception::class)         override fun operationComplete(channelFuture: ChannelFuture) {             if (channelFuture.isSuccess) {                 logInfo(logger, "TCP Server is starting...")             } else {                 logError(logger,channelFuture.cause(),"TCP Server failed")             }            }        })    }

其中,ServerIdleHandler: 表示 5 分鐘內沒有收到心跳,則斷開連接。

class ServerIdleHandler : IdleStateHandler(0, 0, HERT_BEAT_TIME) {     private val logger: Logger = LoggerFactory.getLogger(ServerIdleHandler::class.java)     @Throws(Exception::class)     override fun channelIdle(ctx: ChannelHandlerContext, evt: IdleStateEvent) {         logInfo(logger) {            ctx.channel().close()            "$HERT_BEAT_TIME 秒內沒有收到心跳,則斷開連接"         }    }    companion object {         private const val HERT_BEAT_TIME = 300     }}

MagicNumValidator:用于 TCP 報文的魔數校驗。

class MagicNumValidator : LengthFieldBasedFrameDecoder(Int.MAX_VALUE, LENGTH_FIELD_OFFSET, LENGTH_FIELD_LENGTH) {     private val logger: Logger = LoggerFactory.getLogger(this.javaClass)     @Throws(Exception::class)     override fun decode(ctx: ChannelHandlerContext, `in`: ByteBuf): Any? {         if (`in`.getInt(`in`.readerIndex()) !== MAGIC_NUMBER) { // 魔數校驗不通過,則關閉連接             logInfo(logger,"魔數校驗失敗")             ctx.channel().close()             return null         }         return super.decode(ctx, `in`)     }     companion object {         private const val LENGTH_FIELD_OFFSET = 7         private const val LENGTH_FIELD_LENGTH = 4     } }

PacketCodecHandler: 解析報文的 Handler。

PacketCodecHandler 繼承自 ByteToMessageCodec ,它是用來處理 byte-to-message  和message-to-byte,便于解碼字節消息成 POJO 或編碼 POJO 消息成字節。

@ChannelHandler.Sharable object PacketCodecHandler : MessageToMessageCodec<ByteBuf, Packet>() {    override fun encode(ctx: ChannelHandlerContext, msg: Packet, list: MutableList<Any>) {         val byteBuf = ctx.channel().alloc().ioBuffer()         PacketManager.encode(byteBuf, msg)        list.add(byteBuf)    }    override fun decode(ctx: ChannelHandlerContext, msg: ByteBuf, list: MutableList<Any>) {         list.add(PacketManager.decode(msg));    }}

HeartBeatHandler:心跳的 Handler,接收 TCP 客戶端發來的"ping",然后給客戶端返回"pong"。

@ChannelHandler.Sharable object HeartBeatHandler : SimpleChannelInboundHandler<HeartBeatPacket>(){    private val logger: Logger = LoggerFactory.getLogger(this.javaClass)     override fun channelRead0(ctx: ChannelHandlerContext, msg: HeartBeatPacket) {         logInfo(logger,"收到心跳包:${GsonUtils.toJson(msg)}")         msg.msg = "pong" // 返回 pong 給到客戶端         ctx.writeAndFlush(msg)     } }

ResponseHandler:通用的處理接收 TCP 客戶端發來指令的 Handler,可以根據對應的指令去查詢對應的 Handler  并處理其命令。

object ResponseHandler: SimpleChannelInboundHandler<Packet>() {     private val logger: Logger = LoggerFactory.getLogger(this.javaClass)     private val handlerMap: ConcurrentHashMap<Byte, SimpleChannelInboundHandler<out Packet>> = ConcurrentHashMap()     init {         handlerMap[LOGIN] = LoginHandler        ......        handlerMap[ERROR] = ErrorHandler    }    override fun channelRead0(ctx: ChannelHandlerContext, msg: Packet) {         logInfo(logger,"收到客戶端的指令: ${msg.command}")         val handler: SimpleChannelInboundHandler<out Packet>? = handlerMap[msg.command]         handler?.let {            logInfo(logger,"找到響應指令的 Handler: ${it.javaClass.simpleName}")             it.channelRead(ctx, msg)        } ?: logInfo(logger,"未找到響應指令的 Handler")     }    @Throws(Exception::class)     override fun channelInactive(ctx: ChannelHandlerContext) {         val insocket = ctx.channel().remoteAddress() as InetSocketAddress         val clientIP = insocket.address.hostAddress         val clientPort = insocket.port         logError(logger,"客戶端掉線: $clientIP : $clientPort")         super.channelInactive(ctx)     }}

TCP 客戶端

模擬一個客戶端的實現

val topLevelClass = object : Any() {}.javaClass.enclosingClass val logger: Logger = LoggerFactory.getLogger(topLevelClass)fun main() {     val worker = NioEventLoopGroup()     val bootstrap = Bootstrap()     bootstrap.group(worker).channel(NioSocketChannel::class.java)             .handler(object : ChannelInitializer<SocketChannel>() {                 @Throws(Exception::class)                 override fun initChannel(channel: SocketChannel) {                     channel.pipeline().addLast(PacketCodecHandler)                    channel.pipeline().addLast(ClientIdleHandler())                    channel.pipeline().addLast(ClientLogin())                }            })    val future: ChannelFuture = bootstrap.connect("127.0.0.1", TCP_PORT).addListener(object : ChannelFutureListener {         @Throws(Exception::class)         override fun operationComplete(channelFuture: ChannelFuture) {             if (channelFuture.isSuccess()) {                 logInfo(logger,"connect to server success!")             } else {                 logger.info("failed to connect the server! ")                 System.exit(0)             }        }    })    try {         future.channel().closeFuture().sync()        logInfo(logger,"與服務端斷開連接!")     } catch (e: InterruptedException) {         e.printStackTrace()    }}

其中,PacketCodecHandler 跟服務端使用的解析報文的 Handler 是一樣的。

ClientIdleHandler:客戶端實現心跳,每隔 30 秒發送一次心跳。

class ClientIdleHandler : IdleStateHandler(0, 0, HEART_BEAT_TIME) {     private val logger = LoggerFactory.getLogger(ClientIdleHandler::class.java)     @Throws(Exception::class)     override fun channelIdle(ctx: ChannelHandlerContext, evt: IdleStateEvent?) {         logInfo(logger,"發送心跳....")         ctx.writeAndFlush(HeartBeatPacket())    }    companion object {         private const val HEART_BEAT_TIME = 30     }}

ClientLogin:登錄服務端的 Handler。

@ChannelHandler.Sharable class ClientLogin: ChannelInboundHandlerAdapter() {    private val logger: Logger = LoggerFactory.getLogger(this.javaClass)     @Throws(Exception::class)     override fun channelActive(ctx: ChannelHandlerContext) {         val packet: LoginPacket = LoginPacket()         logInfo(logger,"packet = ${GsonUtils.toJson(packet)}")         val byteBuf = PacketManager.encode(packet)         ctx.channel().writeAndFlush(byteBuf)    }}

感謝各位的閱讀,以上就是“怎么基于Kotlin實現一個簡單的TCP自定義協議”的內容了,經過本文的學習后,相信大家對怎么基于Kotlin實現一個簡單的TCP自定義協議這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!

向AI問一下細節
推薦閱讀:
  1. TCP/IP協議
  2. TCP協議分析

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

鄂托克旗| 托克逊县| 富源县| 会宁县| 松江区| 堆龙德庆县| 莲花县| 应城市| 松滋市| 治多县| 汕头市| 盐池县| 文水县| 江陵县| 昭觉县| 通化市| 桐梓县| 离岛区| 太湖县| 沂水县| 济宁市| 太白县| 余庆县| 无棣县| 沛县| 乌拉特后旗| 萍乡市| 万年县| 绥化市| 南涧| 高州市| 扶余县| 堆龙德庆县| 清镇市| 东乡| 临澧县| 通榆县| 崇文区| 微山县| 黄平县| 尚义县|