中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

TensorFlow分布式實踐

發布時間:2020-05-24 07:39:52 來源:網絡 閱讀:1910 作者:個推 欄目:建站服務器

大數據時代,基于單機的建模很難滿足企業不斷增長的數據量級的需求,開發者需要使用分布式的開發方式,在集群上進行建模。而單機和分布式的開發代碼有一定的區別,本文就將為開發者們介紹,基于TensorFlow進行分布式開發的兩種方式,幫助開發者在實踐的過程中,更好地選擇模塊的開發方向。


基于TensorFlow原生的分布式開發

分布式開發會涉及到更新梯度的方式,有同步和異步的兩個方案,同步更新的方式在模型的表現上能更快地進行收斂,而異步更新時,迭代的速度則會更加快。兩種更新方式的圖示如下:

TensorFlow分布式實踐

同步更新流程
(圖片來源:TensorFlow:Large-Scale Machine Learning on Heterogeneous Distributed Systems)

TensorFlow分布式實踐

異步更新流程
(圖片來源:TensorFlow:Large-Scale Machine Learning on Heterogeneous Distributed Systems)

TensorFlow是基于ps、work 兩種服務器進行分布式的開發。ps服務器可以只用于參數的匯總更新,讓各個work進行梯度的計算。

基于TensorFlow原生的分布式開發的具體流程如下:

首先指定ps 服務器啟動參數 –job_name=ps:

python distribute.py --ps_hosts=192.168.100.42:2222 --worker_hosts=192.168.100.42:2224,192.168.100.253:2225 --job_name=ps --task_index=0

接著指定work服務器參數(啟動兩個work 節點) –job_name=work2:

python distribute.py --ps_hosts=192.168.100.42:2222 --worker_hosts=192.168.100.42:2224,192.168.100.253:2225 --job_name=worker --task_index=0
python distribute.py --ps_hosts=192.168.100.42:2222 --worker_hosts=192.168.100.42:2224,192.168.100.253:2225 --job_name=worker --task_index=1

之后,上述指定的參數 worker_hosts ps_hosts job_name task_index 都需要在py文件中接受使用:

tf.app.flags.DEFINE_string("worker_hosts", "默認值", "描述說明")

接收參數后,需要分別注冊ps、work,使他們各司其職:

ps_hosts = FLAGS.ps_hosts.split(",")
worker_hosts = FLAGS.worker_hosts.split(",")
cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})
server = tf.train.Server(cluster,job_name=FLAGS.job_name,task_index=FLAGS.task_index)

issync = FLAGS.issync
if FLAGS.job_name == "ps":
   server.join()
elif FLAGS.job_name == "worker":
   with tf.device(tf.train.replica_device_setter(
                   worker_device="/job:worker/task:%d" % FLAGS.task_index,
                   cluster=cluster)):

繼而更新梯度。

(1)同步更新梯度:

rep_op = tf.train.SyncReplicasOptimizer(optimizer,
                                               replicas_to_aggregate=len(worker_hosts),
                                               replica_id=FLAGS.task_index,
                                               total_num_replicas=len(worker_hosts),
                                               use_locking=True)
train_op = rep_op.apply_gradients(grads_and_vars,global_step=global_step)
init_token_op = rep_op.get_init_tokens_op()
chief_queue_runner = rep_op.get_chief_queue_runner()

(2)異步更新梯度:

train_op = optimizer.apply_gradients(grads_and_vars,global_step=global_step)

最后,使用tf.train.Supervisor 進行真的迭代

另外,開發者還要注意,如果是同步更新梯度,則還需要加入如下代碼:

sv.start_queue_runners(sess, [chief_queue_runner])
sess.run(init_token_op)

需要注意的是,上述異步的方式需要自行指定集群IP和端口,不過,開發者們也可以借助TensorFlowOnSpark,使用Yarn進行管理。

基于TensorFlowOnSpark的分布式開發

作為個推面向開發者服務的移動APP數據統計分析產品,個數所具有的用戶行為預測功能模塊,便是基于TensorFlowOnSpark這種分布式來實現的。基于TensorFlowOnSpark的分布式開發使其可以在屏蔽了端口和機器IP的情況下,也能夠做到較好的資源申請和分配。而在多個千萬級應用同時建模的情況下,集群也有良好的表現,在sparkUI中也能看到相對應的資源和進程的情況。最關鍵的是,TensorFlowOnSpark可以在單機過度到分布式的情況下,使代碼方便修改,且容易部署。

基于TensorFlowOnSpark的分布式開發的具體流程如下:

首先,需要使用spark-submit來提交任務,同時指定spark需要運行的參數(–num-executors 6等)、模型代碼、模型超參等,同樣需要接受外部參數:

parser = argparse.ArgumentParser()
parser.add_argument("-i", "--tracks", help="數據集路徑")  
args = parser.parse_args()

之后,準備好參數和訓練數據(DataFrame),調用模型的API進行啟動。

其中,soft_dist.map_fun是要調起的方法,后面均是模型訓練的參數。

estimator = TFEstimator(soft_dist.map_fun, args) \
     .setInputMapping({'tracks': 'tracks', 'label': 'label'}) \
     .setModelDir(args.model) \
     .setExportDir(args.serving) \
     .setClusterSize(args.cluster_size) \
     .setNumPS(num_ps) \
     .setEpochs(args.epochs) \
     .setBatchSize(args.batch_size) \
     .setSteps(args.max_steps)
   model = estimator.fit(df)

接下來是soft_dist定義一個 map_fun(args, ctx)的方法:

def map_fun(args, ctx):
...
worker_num = ctx.worker_num  # worker數量
job_name = ctx.job_name  # job名
task_index = ctx.task_index  # 任務索引
if job_name == "ps":  # ps節點(主節點)
  time.sleep((worker_num + 1) * 5)
  cluster, server = TFNode.start_cluster_server(ctx, 1, args.rdma)
  num_workers = len(cluster.as_dict()['worker'])
  if job_name == "ps":
       server.join()
  elif job_name == "worker":
       with tf.device(tf.train.replica_device_setter(worker_device="/job:worker/task:%d" % task_index, cluster=cluster)):

之后,可以使用tf.train.MonitoredTrainingSession高級API,進行模型訓練和預測。

總結

基于TensorFlow的分布式開發大致就是本文中介紹的兩種情況,第二種方式可以用于實際的生產環境,穩定性會更高。

在運行結束的時候,開發者們也可通過設置郵件的通知,及時地了解到模型運行的情況。

同時,如果開發者使用SessionRunHook來保存最后輸出的模型,也需要了解到,框架代碼中的一個BUG,即它只能在規定的時間內保存,超出規定時間,即使運行沒有結束,程序也會被強制結束。如果開發者使用的版本是未修復BUG的版本,則要自行處理,放寬運行時間。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

景谷| 年辖:市辖区| 徐州市| 玉山县| 德安县| 嵊州市| 邯郸市| 绵阳市| 藁城市| 咸阳市| 汝州市| 祁连县| 阳高县| 西峡县| 蒙山县| 莎车县| 庆阳市| 布尔津县| 宣化县| 蚌埠市| 望江县| 邹平县| 望城县| 永顺县| 乳山市| 中超| 德昌县| 郎溪县| 定西市| 阳春市| 阜阳市| 盐边县| 新昌县| 潼南县| 隆尧县| 沂水县| 同德县| 专栏| 正蓝旗| 宁陵县| 许昌县|