中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

運維系列之FastAPI接口服務怎么用

發布時間:2021-12-30 10:58:06 來源:億速云 閱讀:163 作者:小新 欄目:大數據

小編給大家分享一下運維系列之FastAPI接口服務怎么用,希望大家閱讀完這篇文章之后都有所收獲,下面讓我們一起去探討吧!

第一部分:FastAPI接口部分

 from fastapi import FastAPI, Path 

from com.fy.fastapi.monitor.shell.fabric.FabricLinux import FabricLinux

app = FastAPI()



 #執行shell命令;

#調用示例:http://127.0.0.1:8000/items/3?q=%E6%B5%8B%E8%AF%95

@app.get("/cmd/{cmd}")

def runCommand(cmd, host, userName, password):

    fabric = FabricLinux(host , userName , password)

    result = fabric.runCommand(cmd)

    fabric.closeClient()

    return {"res": result  }



#上傳文件;

#調用示例:http://127.0.0.1:8000/items/3?q=%E6%B5%8B%E8%AF%95

@app.get("/upload/{srcFile}")

def upload(srcFile, targetDir, host, userName, password):

    fabric = FabricLinux(host , userName , password)

    result = fabric.upload(srcFile, targetDir)

    fabric.closeClient()

    return {"res": result  }



#下載文件;

#調用示例:http://127.0.0.1:8000/items/3?q=%E6%B5%8B%E8%AF%95

@app.get("/download/{srcFile}")

def download(srcFile, targetDir, host, userName, password):

    #fabricu.download("/home/Crawler/WeChatSouGouMain.tar.gz", "D:\\txt\\WeChatSouGouMain.tar.gz")

    fabric = FabricLinux(host , userName , password)

    result = fabric.download(srcFile, targetDir)

    fabric.closeClient()

    return {"res": result  }



#刪除文件

#調用示例:http://127.0.0.1:8000/items/3?q=%E6%B5%8B%E8%AF%95

@app.get("/delete/{targetPath}")

def delete(targetPath:"必須是全路徑", host, userName, password):

    fabric = FabricLinux(host , userName , password)

    result = fabric.runCommand("rm -rf " + targetPath)

    fabric.closeClient()

    return {"res": result  }



#解壓.tar.gz文件

#調用示例:http://127.0.0.1:8000/items/3?q=%E6%B5%8B%E8%AF%95

@app.get("/delete/{targetPath}")

def decomTarGz(srcPath, tarPath, host, userName, password):

    fabric = FabricLinux(host , userName , password)

    result = fabric.decomTarGz(srcPath, tarPath)

    fabric.closeClient()

    return {"res": result  }



#根據PID關閉相關的服務;

#調用示例:http://127.0.0.1:8000/items/3?q=%E6%B5%8B%E8%AF%95

@app.get("/kill/{pid}")

def stop(pid: int=Path(..., gt=0, le=32767), host, userName, password):

    ''' gt: 大于; le: 小于等于 '''

    fabric = FabricLinux(host , userName , password)

    result = fabric.runCommand("kill -9 " + pid)

    fabric.closeClient()

    return {"res": result  }



#根據Python命令符,以及啟動文件路徑,啟動相應的服務;

#調用示例:http://127.0.0.1:8000/start/python?startFile=%E6%B5%8B%E8%AF%95

@app.get("/start/{pycmd}")

def start(pycmd  , startFile , host, userName, password):

    fabric = FabricLinux(host , userName , password)

    result = fabric.runCommand("nohup " + pycmd + " " + startFile + " &")

    fabric.closeClient()

    return {"res": result  }

第二部分:fabric接口部分,主要用來執行命令行

運維系列之FastAPI接口服務怎么用

from fabric import Connection

import traceback, os

class FabricLinux:

    def __init__(self, host:"服務器IP", userName:"用戶名", password:"密碼"):

        self.host = host

        self.userName = userName

        self.password = password

        print(self.userName + "@" + self.host, {"password": self.password})

        self.initClient()#初始化鏈接

    

    #初始化ssh鏈接對象;

    def initClient(self):

        #如果服務器配置了SSH免密碼登錄,就不需要 connect_kwargs 來指定密碼了。

        self.con = Connection(self.userName + "@" + self.host, connect_kwargs={"password": self.password})

    

    #關閉fabric操作對象;

    def closeClient(self):

        self.con.close()

        

    #執行shell命令;

    def runCommand(self, sshCommand:"Linux命令行語句"):

        #top命令尚未測試通過;

        #如果命令行中包含路徑,最好使用絕對路徑;

        try:

            #語法:run('遠程命令')

            result = self.con.run(sshCommand, hide=True)

            if result.return_code == 0:# 返回碼,0表示正確執行,1表示錯誤

                return True, result.stdout                         

            return result.failed, result.stdout

        except:

            exp = traceback.format_exc()

            if "mkdir" in exp and 'File exists' in exp:

                print("目錄【", sshCommand, "】已存在")

            else:

                print(exp)

            return False, exp


    #在特定目錄下,執行shell命令;

    def runDir(self, sshCommand:"Linux命令行語句", dir):

        if not os.path.isdir(dir):

            return "目標路徑錯誤,必須是目錄"

        with self.cd(dir):#with表示,with塊的代碼是在dir目錄下執行;

            return self.runCommand(sshCommand)

    

    #解壓.tar.gz文件;

    def decomTarGz(self, srcPath, tarPath):

        if os.path.isdir(srcPath):

            return "帶解壓的路徑,必須是文件類型"

        if not os.path.isdir(tarPath):

            return "目標路徑錯誤,必須是目錄"

        return fabricu.runCommand("tar -zxvf " + srcPath + " -C " + tarPath)

    

    #切換到某個目錄下(上下文不連貫)

    def cd(self, dir):

        #語法:cd('遠程目錄')

        self.con.cd(dir)

    

    #上傳本地文件到遠程主機

    def upload(self, src:"待上傳的文件全路徑。路徑中最好不要有空格等", target:"保存到服務器上的目錄"):

        if not os.path.isdir(target):

            return "待上傳的文件全路徑"

        elif " " in src:

            return "路徑中不允許有空格、(、)等特殊字符"

        #語法:put('本地文件', '遠程目錄')

        else:return self.con.put(src, target)

    

    #從遠程主機下載文件到本地

    def download(self , src:"遠程文件全路徑", target:"本地文件路徑(必須包含文件名稱的全路徑)"):

        #語法:get('遠程文件', '本地文件路徑')

        #示例:fabricu.download("/home/Crawler/WeChatSouGouMain.tar.gz", "D:\\txt\\WeChatSouGouMain.tar.gz")

        if not os.path.isdir(target):

            return self.con.get(src, target)

        else:return "目標路徑錯誤,必須是包含文件名稱的全路徑"

看完了這篇文章,相信你對“運維系列之FastAPI接口服務怎么用”有了一定的了解,如果想了解更多相關知識,歡迎關注億速云行業資訊頻道,感謝各位的閱讀!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

堆龙德庆县| 长海县| 崇明县| 莱西市| 北京市| 高雄市| 华宁县| 孟州市| 丽水市| 娱乐| 柳州市| 土默特左旗| 怀化市| 铁岭市| 万州区| 东光县| 江北区| 博湖县| 宝清县| 祥云县| 奎屯市| 津南区| 通榆县| 枣强县| 岱山县| 乐业县| 凌云县| 沁阳市| 湖南省| 韶关市| 高安市| 武汉市| 枞阳县| 霸州市| 伊宁市| 西平县| 乐清市| 留坝县| 湖北省| 霍城县| 宁远县|