中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Python3 操作 HDFS

發布時間:2020-07-10 09:29:41 來源:網絡 閱讀:5265 作者:RQSLT 欄目:大數據

【第三方包】

  • pyhdfs(pypi,github,支持HA)


【功能】

  • 重命名 hdfs 文件或目錄

# encoding: utf-8
# author: walker
# date: 2018-03-17 
# summary: 利用 pyhdfs 重命名 hdfs 文件或目錄

import os, sys, time
from pyhdfs import HdfsClient

SrcPath = '/test/xxx'
DstPath = '/test/yyy'
NameNode = 'nn1.example.com:50070,nn2.example.com:50070'


# 將 SrcPath 改名為 DstPath
def Rename(SrcPath, DstPath):
	fs = HdfsClient(hosts=NameNode)
	if not fs.exists(SrcPath):
		print('Error: not found %s' % SrcPath)
		sys.exit(-1)
		
	print('Reanme ... \n%s\n -> \n%s \n' % (SrcPath, DstPath))
	
	fs.rename(SrcPath, DstPath)
	
	
if __name__ == '__main__':
	Rename(SrcPath, DstPath)
  • 上傳文件

# encoding: utf-8
# author: walker
# date: 2018-01-23
# summary: 上傳本地文件到 hdfs 目錄

import os, sys, time
from pyhdfs import HdfsClient
from configparser import ConfigParser

cur_dir_fullpath = os.path.dirname(os.path.abspath(__file__))
StartTime = time.time()
FileSize = 0		#文件總大小

LocalDir = ''
HdfsDir = ''
NameNode = ''
UserName = ''

#讀取配置文件	
def ReadConfig():
	global LocalDir, HdfsDir, NameNode, UserName

	cfg = ConfigParser()
	cfgFile = os.path.join(cur_dir_fullpath, 'config.ini')
	if not os.path.exists(cfgFile):
		input(cfgFile + ' not found')
		sys.exit(-1)
	cfgLst = cfg.read(cfgFile)
	if len(cfgLst) < 1:
		input('Read config.ini failed...')
		sys.exit(-1)

	
	LocalDir = cfg.get('config', 'LocalDir').strip()   
	if not os.path.exists(LocalDir):
		input(LocalDir + ' not found')
		sys.exit(-1)
	print('LocalDir:' + LocalDir)
	
	HdfsDir = cfg.get('config', 'HdfsDir').strip() 
	print('HdfsDir:' + HdfsDir)	
	
	NameNode = cfg.get('config', 'NameNode').strip() 
	print('NameNode:' + NameNode)	

	UserName = cfg.get('config', 'UserName').strip() 
	print('UserName:' + UserName)	
	
	print('Read config.ini successed!')
	
#處理一個
def ProcOne(client, srcFile, dstFile):
	global FileSize
	print('ProcOne \n%s\n -> \n%s ' % (srcFile, dstFile))
	
	#目標文件已經存在且大小相同
	if client.exists(dstFile) and \
		(os.path.getsize(srcFile) == client.list_status(dstFile)[0].length):
		print('file exists: %s ' % dstFile)
		return True
	
	#注意,如果已存在會被覆蓋
	client.copy_from_local(srcFile, dstFile, overwrite=True)	
	
	#校驗文件大小
	if os.path.getsize(srcFile) == client.list_status(dstFile)[0].length:	
		FileSize += os.path.getsize(srcFile)
		return True
		
	return False
	
#處理所有
def ProcAll():	
	client = HdfsClient(hosts=NameNode, user_name=UserName)
	if not client.exists(HdfsDir):
		print(HdfsDir + ' not found')
		sys.exit(-1)	
	total = len(os.listdir(LocalDir))
	processed = 0
	failedList = list()
	for filename in os.listdir(LocalDir):
		srcFile = os.path.join(LocalDir, filename)
		dstFile = HdfsDir + '/' + filename
		if not ProcOne(client, srcFile, dstFile):
			failedList.append(srcFile)
		processed += 1		
		print('%d/%d/%d, time cost: %.2f s' % (total, processed, len(failedList), time.time()-StartTime))
		print('%d B, %.2f MB/s \n' % (FileSize, FileSize/1024/1024/(time.time()-StartTime)))
	
	if failedList:
		print('failedList: %s' % repr(failedList))
	else:
		print('Good! No Error!')
		print('%d B, %.2f MB, %.2f GB, %.2f MB/s' % \
            (FileSize, FileSize/1024/1024, FileSize/1024/1024/1024, FileSize/1024/1024/(time.time()-StartTime)))
	
if __name__ == '__main__':
	ReadConfig()
	ProcAll()
	print('Time total: %.2f s' % (time.time()-StartTime))
	print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime()))
  • 下載 HDFS 文件到本地

# encoding: utf-8
# author: walker
# date: 2018-06-07
# summary: 下載 HDFS 文件(或目錄)到本地

import os, sys, time
from pyhdfs import HdfsClient
from configparser import ConfigParser

cur_dir_fullpath = os.path.dirname(os.path.abspath(__file__))
StartTime = time.time()
FileSize = 0        #文件總大小

LocalDir = ''
HdfsDir = ''
NameNode = ''
UserName = ''

#讀取配置文件 
def ReadConfig():
    global LocalDir, HdfsDir, NameNode, UserName

    cfg = ConfigParser()
    cfgFile = os.path.join(cur_dir_fullpath, 'config.ini')
    if not os.path.exists(cfgFile):
        input(cfgFile + ' not found')
        sys.exit(-1)
    cfgLst = cfg.read(cfgFile)
    if len(cfgLst) < 1:
        input('Read config.ini failed...')
        sys.exit(-1)
    
    LocalDir = cfg.get('config', 'LocalDir').strip()
    if not os.path.exists(LocalDir):
        input(LocalDir + ' not found')
        sys.exit(-1)
    print('LocalDir:' + LocalDir)
    
    HdfsDir = cfg.get('config', 'HdfsDir').strip().rstrip('/')
    print('HdfsDir:' + HdfsDir) 
    
    NameNode = cfg.get('config', 'NameNode').strip()
    print('NameNode:' + NameNode)   

    UserName = cfg.get('config', 'UserName').strip()
    print('UserName:' + UserName)   
    
    print('Read config.ini successed!')
    
#處理一個
def ProcOne(client, srcFile, dstFile):
    global FileSize
    print('ProcOne \n%s\n -> \n%s ' % (srcFile, dstFile))

    dstDir = os.path.dirname(dstFile)
    if not os.path.exists(dstDir):
        os.makedirs(dstDir)
    
    # 目標文件已經存在且大小相同
    if os.path.exists(dstFile) and \
        (os.path.getsize(dstFile) == client.list_status(srcFile)[0].length):
        print('file exists: %s ' % dstFile)
        return True
    
    # 注意,如果已存在會被覆蓋
    client.copy_to_local(srcFile, dstFile, overwrite=True)
    
    if os.path.getsize(dstFile) != client.list_status(srcFile)[0].length:   #校驗文件大小
        return False

    FileSize += os.path.getsize(dstFile)
    return True
    
#處理所有
def ProcAll():  
    client = HdfsClient(hosts=NameNode, user_name=UserName)
    if not client.exists(HdfsDir):
        print(HdfsDir + ' not found')
        sys.exit(-1)    
        
    total = 0
    # 先遍歷一遍,得到總文件個數
    for parent, dirnames, filenames in client.walk(HdfsDir):
        for filename in filenames:
            total += 1
    processed = 0
    failedList = list()

    for parent, dirnames, filenames in client.walk(HdfsDir):
        for filename in filenames:
            srcFile = '%s/%s' % (parent, filename)
            relPath = srcFile[len(HdfsDir)+1:].replace('/', '\\')   # 相對于根目錄的路徑
            dstFile = os.path.join(LocalDir, relPath)
            if not ProcOne(client, srcFile, dstFile):
                failedList.append(srcFile)
            processed += 1      
            print('%d/%d/%d, time cost: %.2f s' % (total, processed, len(failedList), time.time()-StartTime))
            print('%d B, %.2f MB/s \n' % (FileSize, FileSize/1024/1024/(time.time()-StartTime)))
    
    if failedList:
        print('failedList: %s' % repr(failedList))
    else:
        print('Good! No Error!')
        print('%d B, %.2f MB, %.2f GB, %.2f MB/s' % \
(FileSize, FileSize/1024/1024, FileSize/1024/1024/1024, FileSize/1024/1024/(time.time()-StartTime)))
    
if __name__ == '__main__':
    ReadConfig()
    ProcAll()
    print('Time total: %.2f s' % (time.time()-StartTime))
    print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime()))


*** walker ***




向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

梁河县| 延庆县| 都兰县| 全椒县| 海南省| 德兴市| 绥滨县| 岳普湖县| 泸溪县| 衡东县| 河间市| 邻水| 五原县| 岑溪市| 府谷县| 文成县| 泸定县| 包头市| 绩溪县| 从化市| 平遥县| 安乡县| 哈巴河县| 齐河县| 灵山县| 永德县| 易门县| 榆树市| 济宁市| 西青区| 白水县| 依兰县| 晋城| 宜川县| 衢州市| 德化县| 玉林市| 屏东县| 华阴市| 瑞昌市| 丽江市|