您好,登錄后才能下訂單哦!
今天就跟大家聊聊有關K8S中如何利用Exec Websocket接口實現Pod間的文件拷貝,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結了以下內容,希望大家根據這篇文章可以有所收獲。
需求
想想咱們遇到以下問題一般怎么解決?
新建了一個Pod, 想把另外一個Pod中的文件拷貝到新Pod中進行分析, 怎么實現呢?
如何在項目中, 如何像kubectl cp拷貝文件一樣, 實現Pod間文件拷貝呢?
新Pod與實例Pod共享pvc? 或者封裝一個帶認證上下文的kubectl執行命令行?
簡介
流程說明
首先初始化信號通道, 用于協程間的信號通知, 收到信號的協程執行暫停/退出循環/關閉通道等操作
初始化數據通道srcStdOutCh, 類型為字節數組[]byte, 用于將源Pod的標準輸出放入通道, 發送給目的Pod標準輸入的數據就是從該數據通道中讀取
拼接exec接口的訪問地址(集群連接,token), tar壓縮命令, 標準輸入/輸出,tty, pod名,容器名等參數. tar czf - /var/log/xxx.log 表示將該文件樹結構壓縮為數據流
調用websocket的Dialer方法與源Pod容器建立websocket連接, 并開啟協程將標準輸出寫入數據通道srcStdOutCh
參考源pod exec接口, 拼接目的Pod exec訪問連接, tar xzf - -C /tmp表示從標準輸入讀取數據流, 并解壓成文件樹結構(注意:解壓后包含文件目錄樹結構)
與目的Pod建立wss連接, 開啟協程從數據通道srcStdOutCh中讀取源Pod標準輸出, 并寫入目的Pod的標準輸入, 如果從數據通道讀取超時,則表示數據已經傳輸完畢, 此時停止向目的容器輸入數據, 并發送通知信號, 通知主協程可以退出,關閉源Pod的wss連接
注意事項
wesocket連上源Pod時, 標準輸出中會輸出空數據, tar命令輸出等干擾數據, 所以接收數據的時候需要傳入一個過濾器回調函數, 用于數據過濾
向目的容器發送數據時, 需要將源容器收到的第一個字節刪除, 一般為1, 表示標準輸出標識, 發送給目的容器是不需要該字節的
發送數據時, 需要設置第一個字節為0, 表示發送到標準輸入
參考代碼
cp.go
/* 總結: 1.不帶緩沖的通道需要先讀后寫 2.websocket ReadMessage方法是阻塞讀取的, 如果要中斷讀取, 關閉連接, 捕獲錯誤即可 */ package cpFilePod2Pod import ( "crypto/tls" "errors" "fmt" "log" "net/url" "regexp" "strings" "sync" "time" "github.com/gorilla/websocket" ) // 定義過濾器回調函數 type filterCallback func(input string) bool // 帶有互斥鎖的Websocket連接對象 type WsConn struct { Conn *websocket.Conn mu sync.Mutex } // 發送字符串, 自動添加換行符 func (self *WsConn) Send(sender string, str string) { self.mu.Lock() defer self.mu.Unlock() // 利用k8s exec websocket接口發送數據時, 第一個字節需要設置為0, 表示將數據發送到標準輸入 data := []byte{0} data = append(data, []byte(str+"\n")...) err := self.Conn.WriteMessage(websocket.BinaryMessage, data) //發送二進制數據類型 if err != nil { log.Printf("發送錯誤, %s", err.Error()) } log.Printf("%s, 數據:%s, 字節:%+v", sender, str, []byte(str+"\n")) } //發送字符串, 不添加換行符, 內部做字節過濾,等操作 func (self *WsConn) SendWithFilter(sender string, str string) { self.mu.Lock() defer self.mu.Unlock() // log.Printf("向目的容器發送數據:%s", str) str = strings.ReplaceAll(str, "\r\n", "\n") // /r=13, /n=10, windows換行符轉Linux換行符 //去掉第一個字節(標準輸出1, byte:[0 1 ...]), 因為從源容器輸出的字節中, 第一位標識了標準輸出1, 給目的容器發送字節時, 需要去除該標志 //當WebSocket建立連接后,發送數據時需要在字節Buffer第一個字節設置為stdin(buf[0] = 0),而接受數據時, 需要判斷第一個字節, stdout(buf[0] = 1)或stderr(buf[0] = 2) strByte := append([]byte(str)[:0], []byte(str)[1:]...) data := []byte{0} data = append(data, strByte...) err := self.Conn.WriteMessage(websocket.BinaryMessage, data) log.Printf("向目的容器標準輸入發送數據:\n%s, 字節數:%d, 字節:%+v", string(data), len(data), data) if err != nil { log.Printf("發送錯誤, %s", err.Error()) } } //從連接中獲取數據流, 并寫入字節數組通道中, 內部執行過濾器(回調函數) func (self *WsConn) Receive(receiver string, ch chan []byte, filter filterCallback) error { self.mu.Lock() defer self.mu.Unlock() msgType, msgByte, err := self.Conn.ReadMessage() //阻塞讀取, 類型為2表示二進制數據, 1表示文本, -1表示連接已關閉:websocket: close 1000 (normal) log.Printf("%s, 讀取到數據:%s, 類型:%d, 字節數:%d, 字節:%+v", receiver, string(msgByte), msgType, len(msgByte), msgByte) if err != nil { log.Printf("%s, 讀取出錯, %s", receiver, err.Error()) return err } if filter(string(msgByte)) && len(msgByte) > 1 { ch <- msgByte } else { log.Printf("%s, 數據不滿足, 直接丟棄數據, 字符:%s, 字節數:%d, 字節:%v", receiver, string(msgByte), len(msgByte), msgByte) } return nil } func NewWsConn(host string, path string, params map[string]string, headers map[string][]string) (*websocket.Conn, error) { paramArray := []string{} for k, v := range params { paramArray = append(paramArray, fmt.Sprintf("%s=%s", k, v)) } u := url.URL{Scheme: "wss", Host: host, Path: path, RawQuery: strings.Join(paramArray, "&")} log.Printf("API:%s", u.String()) dialer := websocket.Dialer{TLSClientConfig: &tls.Config{RootCAs: nil, InsecureSkipVerify: true}} conn, _, err := dialer.Dial(u.String(), headers) if err != nil { return nil, errors.New(fmt.Sprintf("連接錯誤:%s", err.Error())) } return conn, nil } //核心: tar -cf - 將具有文件夾結構的數據轉換成數據流, 通過 tar -xf - 將數據流轉換成 linux 文件系統 func CpPod2Pod() { //通知主函數可以退出的信號通道 signalExit := make(chan bool, 1) defer close(signalExit) //下發不要給目的容器發送數據的信號 signalStopDstSend := make(chan bool, 1) defer close(signalStopDstSend) //下發不要從源容器讀取數據的信號 signalStopSrcRead := make(chan bool, 1) defer close(signalStopSrcRead) //下發不要從目的容器讀取數據的信號 signalStopDstRead := make(chan bool, 1) defer close(signalStopDstRead) //下發不要打印目的容器的輸出數據 signalStopPrintDstStdout := make(chan bool, 1) defer close(signalStopPrintDstStdout) //連接pod host := "172.16.xxx.xxx:6443" token := "xxx" headers := map[string][]string{"authorization": {fmt.Sprintf("Bearer %s", token)}} pathSrc := "/api/v1/namespaces/xxx/pods/xxx/exec" commandSrc := "tar&command=czf&command=-&command=/var/log/mysql/slow.log" //tar czf - sourceFile paraSrc := map[string]string{"stdout": "1", "stdin": "0", "stderr": "1", "tty": "0", "container": "xxx", "command": commandSrc} srcConn, err := NewWsConn(host, pathSrc, paraSrc, headers) if err != nil { log.Printf("源Pod連接出錯, %s", err.Error()) } pathDst := "/api/v1/namespaces/xxx/pods/xxx/exec" commandDst := "tar&command=xzf&command=-&command=-C&command=/tmp" // tar xzf - -C /tmp // paraDst := map[string]string{"stdout": "1", "stdin": "1", "stderr": "1", "tty": "0", "container": "xxx", "command": commandDst} paraDst := map[string]string{"stdout": "0", "stdin": "1", "stderr": "0", "tty": "0", "container": "xxx", "command": commandDst} //關閉目的Pod標準輸出和錯誤輸出 dstConn, err := NewWsConn(host, pathDst, paraDst, headers) if err != nil { log.Printf("目的Pod連接出錯, %s", err.Error()) } wsSrc := WsConn{ Conn: srcConn, } wsDst := WsConn{ Conn: dstConn, } defer srcConn.Close() defer dstConn.Close() srcStdOutCh := make(chan []byte, 2048) dstStdOutCh := make(chan []byte) defer close(srcStdOutCh) defer close(dstStdOutCh) // 接收源容器標準輸出到數據通道中 go func() { i := 1 for { log.Printf("第%d次, 從源容器讀取標準輸出", i) i++ //定義匿名過濾器回調方法, 對源容器標準輸出中不需要的數據進行過濾 err := wsSrc.Receive("源容器", srcStdOutCh, func(input string) bool { if input == "cat /var/log/mysql/slow.log" { return false // } else if match, _ := regexp.MatchString("root@(.+)#", input); match { // return false // } else if match, _ := regexp.MatchString("cat /(.+).log", input); match { // return false // } else if match, _ := regexp.MatchString("cat /tmp/(.+)", input); match { // return false } else if match, _ := regexp.MatchString("tar: Removing leading(.+)", input); match { return false } else if len(input) == 0 { //過濾空消息 // log.Printf("讀取到標準錯誤輸出") return false } return true }) if err != nil { log.Printf("讀取源容器標準輸出失敗") // signalExit <- true break } // time.Sleep(time.Microsecond * 100) } }() /* 注意, 這里不能開啟并發協程去讀取目的容器的標準輸出, 如果開啟可能會與發送數據的協程搶鎖, 從而阻塞向目的容器發送數據*/ // // 從目的容器獲取標準輸出到數據通道中 // go func() { // // i := 0 // for { // // 該過濾器直接返回true, 僅占位 // err := wsDst.Receive("目的容器", dstStdOutCh, func(input string) bool { // return true // }) // if err != nil { // log.Printf("從目的容器讀取數據失敗") // break // } // // wsDst.Send() // time.Sleep(time.Microsecond * 100000) // } // // log.Printf("從目的容器讀取數據, 第%d次循環", i) // // i++ // }() // //從數據通道中讀取, 目的容器的標準輸出, 并打印 // go func() { // BreakPrintDstPodStdout: // for { // select { // case data := <-dstStdOutCh: // log.Printf("目的容器標準輸出:%s", string(data)) // // time.Sleep(time.Microsecond * 200) // case <-signalStopPrintDstStdout: // log.Printf("收到信號, 停止打印目的容器標準輸出") // // close(dataOutput) // // close(dataCh) // // signalStopRead <- true // // log.Printf("發送停止讀信號") // // close(dataOutput) // // close(dataCh) // break BreakPrintDstPodStdout // } // // time.Sleep(time.Microsecond * 100) // } // }() //從源容器標準輸出的數據通道獲取數據, 然后發送給目的容器標準輸入 //定義超時時間 timeOutSecond := 3 timer := time.NewTimer(time.Second * time.Duration(timeOutSecond)) Break2Main: for { select { case data := <-srcStdOutCh: wsDst.SendWithFilter("向目的容器發送", string(data)) // time.Sleep(time.Millisecond * 200) timer.Reset(time.Second * time.Duration(timeOutSecond)) case <-timer.C: // time.Sleep(time.Second * 5) log.Printf("================ 源容器標準輸出,沒有新的數據,獲取超時,停止向目的容器發送數據 ================") // log.Printf("發送信號:停止打印目的容器標準輸出") // signalStopPrintDstStdout <- true log.Printf("發送信號:停止從源容器讀取數據") wsSrc.Conn.Close() // log.Printf("發送信號:停止從目的容器讀取數據") // wsDst.Conn.Close() log.Printf("發送信號:主函數可以退出了") signalExit <- true log.Printf("所有信號發送完畢") log.Printf("================== 跳出循環 =================") break Break2Main } // time.Sleep(time.Microsecond * 1000) } // signalStopRead <- true <-signalExit //阻塞通道, 直到收到一個信號 // signalStopRead <- true log.Printf("主函數收到信號, 準備退出") // close(dataCh) // time.Sleep(time.Second) // close(dataOutput) // time.Sleep(time.Second) // select {} }
cp_test.go
package cpFilePod2Pod
import (
"log"
"testing"
)
// go test -race -test.run TestCpPod2Pod 切到該目錄執行該測試
func TestCpPod2Pod(t *testing.T) {
log.Printf("開始測試")
CpPod2Pod()
}
參考結果: 源容器: root@xxx-mysql-0:/var/log/mysql# md5sum slow.log 16577613b6ea957ecb5d9d5e976d9c50 slow.log 目的容器: root@xxx-75bdcdb8cf-hq9wf:/tmp/var/log/mysql# md5sum slow.log 16577613b6ea957ecb5d9d5e976d9c50 slow.log
看完上述內容,你們對K8S中如何利用Exec Websocket接口實現Pod間的文件拷貝有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注億速云行業資訊頻道,感謝大家的支持。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。