中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

flink中如何實現基于k8s的環境搭建

發布時間:2021-11-18 16:43:27 來源:億速云 閱讀:270 作者:小新 欄目:云計算

這篇文章主要介紹了flink中如何實現基于k8s的環境搭建,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。

前面寫了一些flink的基礎組件,但是還沒有說過flink的環境搭建,現在我們來說下基本的環境搭建
1. 使用StatefulSet的原因
對于Flink來說,使用sts的最大的原因是pod的hostname是有序的;這樣潛在的好處有
hostname為-0和-1的pod可以直接指定為jobmanager;可以使用一個statefulset啟動一個cluster,而deployment必須2個;Jobmanager和TaskManager分別獨立的deployment
pod由于各種原因fail后,由于StatefulSet重新拉起的pod的hostname不變,集群recover的速度理論上可以比deployment更快(deployment每次主機名隨機)
2.使用StatefulSet部署Flink
2.1 docker的entrypoint
由于要由主機名來判斷是啟動jobmanager還是taskmanager,因此需要在entrypoint中去匹配設置的jobmanager的主機名是否有一致
傳入參數為:cluster ha;則自動根據主機名判斷啟動那個角色;也可以直接指定角色名稱
docker-entrypoint.sh的腳本內容如下:

#!/bin/sh
 
# If unspecified, the hostname of the container is taken as the JobManager address
ACTION_CMD="$1"
# if use cluster model, pod ${JOB_CLUSTER_NAME}-0,${JOB_CLUSTER_NAME}-1 as jobmanager
if [ ${ACTION_CMD} == "cluster" ]; then
  jobmanagers=(${JOB_MANGER_HOSTS//,/ })
  ACTION_CMD="taskmanager"
  for i in ${!jobmanagers[@]}
  do
      if [ "$(hostname -s)" == "${jobmanagers[i]}" ]; then
          ACTION_CMD="jobmanager"
          echo "pod hostname match jobmanager config host, change action to jobmanager."
      fi
  done
fi
 
# if ha model, replace ha configuration
if [ "$2" == "ha" ]; then
  sed -i -e "s|high-availability.cluster-id: cluster-id|high-availability.cluster-id: ${FLINK_CLUSTER_IDENT}|g" "$FLINK_CONF_DIR/flink-conf.yaml"
  sed -i -e "s|high-availability.zookeeper.quorum: localhost:2181|high-availability.zookeeper.quorum: ${FLINK_ZK_QUORUM}|g" "$FLINK_CONF_DIR/flink-conf.yaml"
  sed -i -e "s|state.backend.fs.checkpointdir: checkpointdir|state.backend.fs.checkpointdir: hdfs:///user/flink/flink-checkpoints/${FLINK_CLUSTER_IDENT}|g" "$FLINK_CONF_DIR/flink-conf.yaml"
  sed -i -e "s|high-availability.storageDir: hdfs:///flink/ha/|high-availability.storageDir: hdfs:///user/flink/ha/${FLINK_CLUSTER_IDENT}|g" "$FLINK_CONF_DIR/flink-conf.yaml"
fi
 
if [ ${ACTION_CMD} == "help" ]; then
    echo "Usage: $(basename "$0") (cluster ha|jobmanager|taskmanager|local|help)"
    exit 0
elif [ ${ACTION_CMD} == "jobmanager" ]; then
    JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}
    echo "Starting Job Manager"
    sed -i -e "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS}/g" "$FLINK_CONF_DIR/flink-conf.yaml"
    sed -i -e "s/jobmanager.heap.mb: 1024/jobmanager.heap.mb: ${JOB_MANAGER_HEAP_MB}/g" "$FLINK_CONF_DIR/flink-conf.yaml"
 
    echo "config file: " && grep '^[^\n#]' "$FLINK_CONF_DIR/flink-conf.yaml"
    exec "$FLINK_HOME/bin/jobmanager.sh" start-foreground cluster
 
elif [ ${ACTION_CMD} == "taskmanager" ]; then
    TASK_MANAGER_NUMBER_OF_TASK_SLOTS=${TASK_MANAGER_NUMBER_OF_TASK_SLOTS:-$(grep -c ^processor /proc/cpuinfo)}
    echo "Starting Task Manager"
 
    sed -i -e "s/taskmanager.heap.mb: 1024/taskmanager.heap.mb: ${TASK_MANAGER_HEAP_MB}/g" "$FLINK_CONF_DIR/flink-conf.yaml"
    sed -i -e "s/taskmanager.numberOfTaskSlots: 1/taskmanager.numberOfTaskSlots: $TASK_MANAGER_NUMBER_OF_TASK_SLOTS/g" "$FLINK_CONF_DIR/flink-conf.yaml"
 
    echo "config file: " && grep '^[^\n#]' "$FLINK_CONF_DIR/flink-conf.yaml"
    exec "$FLINK_HOME/bin/taskmanager.sh" start-foreground
elif [ ${ACTION_CMD} == "local" ]; then
    echo "Starting local cluster"
    exec "$FLINK_HOME/bin/jobmanager.sh" start-foreground local
fi
 
exec "$@"


2.2. 使用ConfigMap分發hdfs和flink配置文件
ConfigMap介紹參考:
https://kubernetes.io/docs/tasks/configure-pod-container/configure-pod-configmap/#create-configmaps-from-files
Q:為什么使用ConfigMap
A:由于hadoop配置文件在不同的環境不一樣,不方便打包到鏡像里面;因此合適的方式就只有2種,使用ConfigMap和Pod的InitContainer。使用InitContainer的話,可以wget獲取遠程的一個配置文件,但是這樣還需要依賴一個配置服務。相比而已,ConfigMap更簡單。
創建ConfigMap
[root@rc-mzgjg ~]# kubectl create configmap hdfs-conf --from-file=hdfs-site.xml --from-file=core-site.xml
[root@rc-mzgjg ~]# kubectl create configmap flink-conf --from-file=flink-conf/log4j-console.properties --from-file=flink-conf/flink-conf.yaml
使用describe命令查看創建的名詞為hdfs-conf的ConfigMap,會顯示文件的內容到控制臺
[root@rc-mzgjg ~]# kubectl describe configmap hdfs-conf
Name:         hdfs-conf
Namespace:    default
Labels:       <none>
Annotations:  <none>
Data
====
core-site.xml:
通過volumeMounts使用ConfigMap
Pod的Container要使用配置文件,則可以通過volumeMounts方式掛載到Container中。如下demo所示,將配置文件掛載到/home/xxxx/conf/hadoop目錄下

apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: flink-jm
spec:
  selector:
    matchLabels:
      app: flink-jm
  serviceName: flink-jm
  replicas: 2
  podManagementPolicy: Parallel
  template:
    metadata:
      labels:
        app: flink-jm
    spec:
      terminationGracePeriodSeconds: 2
      containers:
      - name: test
        imagePullPolicy: Always
        image: ip:5000/test:latest
        args: ["sleep", "1d"]
        volumeMounts:
        - name: hdfs-conf
          mountPath: /home/xxxx/conf/hadoop
      volumes:
      - name: hdfs-conf
        configMap:
        # Provide the name of the ConfigMap containing the files you want to add to the container
          name: hdfs-conf


創建好Pod后,查看配置文件的掛載
[hadoop@flink-jm-0 hadoop]$ ll /home/xxxx/conf/hadoop
total 0
lrwxrwxrwx. 1 root root 20 Apr  9 06:54 core-site.xml -> ..data/core-site.xml
lrwxrwxrwx. 1 root root 20 Apr  9 06:54 hdfs-site.xml -> ..data/hdfs-site.xml
配置文件是鏈接到了..data目錄
1.10才能支持Pod多Container的namespace共享
最初的想法是一個Pod里面多個Container,然后配置文件是其中一個Container;測試驗證起數據目錄并不能互相訪問;如預想的配置,其中一個Container里面的image是hdfs-conf的配置文件

containers:
     - name: hdfs-conf
       imagePullPolicy: Always
       image: ip:5000/hdfs-dev:2.6
       args: ["sleep", "1d"]
     - name: flink-jm
       imagePullPolicy: Always
       image: ip:5000/flink:1.4.2


實際驗證,兩個Container的只能共享網絡,文件目錄彼此看不見
“Share Process Namespace between Containers in a Pod”這個是Kubernates 1.10才開始支持,參考
https://kubernetes.io/docs/tasks/configure-pod-container/share-process-namespace/
2.3 StatefulSet的配置
Flink的配置文件和hadoop的配置文件,依賴ConfigMap來分發

環境變量名稱

參數

內容

 說明

 

FLINK_CLUSTER_IDENT
namespace/StatefulSet.name
default/flink-cluster
用來做zk ha設置和hdfs checkpiont的根目錄 
FLINK_ZK_QUORUM
env:FLINK_ZK_QUORUM
ip:2181
HA ZK的地址 
JOB_MANAGER_HEAP_MB

env:JOB_MANAGER_HEAP_MB

value:containers.resources.memory.limit -1024

512JM的Heap大小,由于存在Netty的堆外內存,需要小于container.resources.memory.limits;否則容易OOM kill 
JOB_MANGER_HOSTS
StatefulSet.name-0,StatefulSet.name-1
flink-cluster-0,flink-cluster-1
JM的主機名,短主機名;可以不用FQDN 
TASK_MANAGER_HEAP_MB

env:TASK_MANAGER_HEAP_MB 

value: containers.resources.memory.limit -1024

512TM的Heap大小,由于存在Netty的堆外內存,需要小于container.resources.memory.limits;否則容易OOM kill 
TASK_MANAGER_NUMBER_OF_TASK_SLOTS

containers.resources.cpu.limits

2TM的slot數量,根據resources.cpu.limits來設置 

Pod的imagePullPolicy策略,測試環境Always,每次都pull,方便驗證;線上則是IfNotPresent;線上如果對images做了變更,必須更改images的tag
完整的內容可以參考如下:

# headless service for statefulset
apiVersion: v1
kind: Service
metadata:
  name: flink-cluster
  labels:
    app: flink-cluster
spec:
  clusterIP: None
  ports:
    - port: 8080
      name: ui
  selector:
    app: flink-cluster
---
# create flink statefulset
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: flink-cluster
spec:
  selector:
    matchLabels:
      app: flink-cluster
  serviceName: flink-cluster
  replicas: 4
  podManagementPolicy: Parallel
  template:
    metadata:
      labels:
        app: flink-cluster
    spec:
      terminationGracePeriodSeconds: 2
      containers:
      - name: flink-cluster
        imagePullPolicy: Always
        image: ip:5000/flink:1.4.2
        args: ["cluster", "ha"]
        volumeMounts:
          - name: hdfs-conf
            mountPath: /home/xxxx/conf/hadoop
          - name: flink-conf
            mountPath: /home/xxxx/conf/flink
          - name: flink-log
            mountPath: /home/xxxx/logs
        resources:
          requests:
            memory: "1536Mi"
            cpu: 1
          limits:
            memory: "1536Mi"
            cpu: 2
        env:
        - name: JOB_MANGER_HOSTS
          value: "flink-cluster-0,flink-cluster-1"
        - name: FLINK_CLUSTER_IDENT
          value: "default/flink-cluster"
        - name: TASK_MANAGER_NUMBER_OF_TASK_SLOTS
          value: "2"
        - name: FLINK_ZK_QUORUM
          value: "ip:2181"
        - name: JOB_MANAGER_HEAP_MB
          value: "512"
        - name: TASK_MANAGER_HEAP_MB
          value: "512"
        ports:
        - containerPort: 6124
          name: blob
        - containerPort: 6125
          name: query
        - containerPort: 8080
          name: flink-ui
      volumes:
        - name: hdfs-conf
          configMap:
          # Provide the name of the ConfigMap containing the files you want to add to the container
            name: hdfs-conf
        - name: flink-conf
          configMap:
            name: flink-conf
        - name: flink-log
          hostPath:
            # directory location on host
            path: /tmp
            # this field is optional
            type: Directory


 
3. 測試環境對外暴露Flink UI
由于測試環境使用Flannel進行網絡通信,在K8S集群外部無法訪問到Flink UI的IP和端口,因此需要通過NodePort方式將內部IP映射出來。配置如下:

# only for test k8s cluster
# use service to expose flink jobmanager 0's web port
apiVersion: v1
kind: Service
metadata:
  labels:
    app: flink-cluster
    statefulset.kubernetes.io/pod-name: flink-cluster-0
  name: flink-web-0
  namespace: default
spec:
  ports:
  - port: 8080
    protocol: TCP
    targetPort: 8080
  selector:
    app: flink-cluster
    statefulset.kubernetes.io/pod-name: flink-cluster-0
  type: NodePort
---
# use service to expose flink jobmanager 1's web port
apiVersion: v1
kind: Service
metadata:
  labels:
    app: flink-cluster
    statefulset.kubernetes.io/pod-name: flink-cluster-1
  name: flink-web-1
  namespace: default
spec:
  ports:
  - port: 8080
    protocol: TCP
    targetPort: 8080
  selector:
    app: flink-cluster
    statefulset.kubernetes.io/pod-name: flink-cluster-1
  type: NodePort


 
4. 服務部署狀態
執行完前面操作后,可以查看到當前的StatefulSet狀態
[root@rc-mzgjg ~]# kubectl get sts flink-cluster -o wide
NAME            DESIRED   CURRENT   AGE       CONTAINERS      IMAGES
flink-cluster   4         4         1h        flink-cluster   ip:5000/flink:1.4.2
容器的Pod狀態
[root@rc-mzgjg ~]# kubectl get pod -l app=flink-cluster  -o wide
NAME              READY     STATUS    RESTARTS   AGE       IP            NODE
flink-cluster-0   1/1       Running   0          1h        ip1   ip5
flink-cluster-1   1/1       Running   0          1h        ip2   ip6
flink-cluster-2   1/1       Running   0          1h        ip3   ip7
flink-cluster-3   1/1       Running   0          1h        ip4   ip8
相關的Service信息
[root@rc-mzgjg ~]# kubectl get svc -l app=flink-cluster  -o wide
NAME            TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)          AGE       SELECTOR
flink-cluster   ClusterIP   None             <none>        8080/TCP         2h        app=flink-cluster
flink-web-0     NodePort    10.254.8.103     <none>        8080:30495/TCP   1h        app=flink-cluster,statefulset.kubernetes.io/pod-name=flink-cluster-0
flink-web-1     NodePort    10.254.172.158   <none>        8080:30158/TCP   1h        app=flink-cluster,statefulset.kubernetes.io/pod-name=flink-cluster-1
根據Service的信息;可以通過任何一個k8s node的ip地址加PORT來訪問Flink UI

這里主要說一下,在搭建的過程中遇到了一個和權限相關的問題
錯誤日志如下
ERROR setFile(null,true) call failed
FileNotFoundException:no such file or directory
原因:是因為flink服務缺少日志目錄的權限
修改方式:
1.adduser flink 添加相應的用戶
2.chown -R flink:flink /home/xxxx/logs

感謝你能夠認真閱讀完這篇文章,希望小編分享的“flink中如何實現基于k8s的環境搭建”這篇文章對大家有幫助,同時也希望大家多多支持億速云,關注億速云行業資訊頻道,更多相關知識等著你來學習!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

嘉荫县| 古田县| 休宁县| 淳化县| 抚顺县| 徐汇区| 赤城县| 明光市| 曲阳县| 甘德县| 广东省| 花垣县| 读书| 吉首市| 牡丹江市| 定南县| 朝阳市| 子洲县| 静海县| 同仁县| 海淀区| 青川县| 乌苏市| 乌拉特后旗| 明水县| 临漳县| 台东市| 新民市| 女性| 漳浦县| 郯城县| 慈利县| 聊城市| 武夷山市| 卓资县| 乌兰察布市| 新乐市| 兴文县| 秦皇岛市| 瓮安县| 东辽县|