您好,登錄后才能下訂單哦!
這篇文章將為大家詳細講解有關Flink中分區策略源碼是什么,小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。
名稱
ChannelSelector
實現
public interface ChannelSelector<T extends IOReadableWritable> { /** * 初始化channels數量,channel可以理解為下游Operator的某個實例(并行算子的某個subtask). */ void setup(int numberOfChannels); /** *根據當前的record以及Channel總數, *決定應將record發送到下游哪個Channel。 *不同的分區策略會實現不同的該方法。 */ int selectChannel(T record); /** *是否以廣播的形式發送到下游所有的算子實例 */ boolean isBroadcast(); }
名稱
StreamPartitioner
實現
public abstract class StreamPartitioner<T> implements ChannelSelector<SerializationDelegate<StreamRecord<T>>>, Serializable { private static final long serialVersionUID = 1L; protected int numberOfChannels; @Override public void setup(int numberOfChannels) { this.numberOfChannels = numberOfChannels; } @Override public boolean isBroadcast() { return false; } public abstract StreamPartitioner<T> copy(); }
GlobalPartitioner
該分區器會將所有的數據都發送到下游的某個算子實例(subtask id = 0)
/** * 發送所有的數據到下游算子的第一個task(ID = 0) * @param <T> */ @Internal public class GlobalPartitioner<T> extends StreamPartitioner<T> { private static final long serialVersionUID = 1L; @Override public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { //只返回0,即只發送給下游算子的第一個task return 0; } @Override public StreamPartitioner<T> copy() { return this; } @Override public String toString() { return "GLOBAL"; } }
ShufflePartitioner
隨機選擇一個下游算子實例進行發送
/** * 隨機的選擇一個channel進行發送 * @param <T> */ @Internal public class ShufflePartitioner<T> extends StreamPartitioner<T> { private static final long serialVersionUID = 1L; private Random random = new Random(); @Override public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { //產生[0,numberOfChannels)偽隨機數,隨機發送到下游的某個task return random.nextInt(numberOfChannels); } @Override public StreamPartitioner<T> copy() { return new ShufflePartitioner<T>(); } @Override public String toString() { return "SHUFFLE"; } }
BroadcastPartitioner
發送到下游所有的算子實例
/** * 發送到所有的channel */ @Internal public class BroadcastPartitioner<T> extends StreamPartitioner<T> { private static final long serialVersionUID = 1L; /** * Broadcast模式是直接發送到下游的所有task,所以不需要通過下面的方法選擇發送的通道 */ @Override public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { throw new UnsupportedOperationException("Broadcast partitioner does not support select channels."); } @Override public boolean isBroadcast() { return true; } @Override public StreamPartitioner<T> copy() { return this; } @Override public String toString() { return "BROADCAST"; } }
RebalancePartitioner
通過循環的方式依次發送到下游的task
/** *通過循環的方式依次發送到下游的task * @param <T> */ @Internal public class RebalancePartitioner<T> extends StreamPartitioner<T> { private static final long serialVersionUID = 1L; private int nextChannelToSendTo; @Override public void setup(int numberOfChannels) { super.setup(numberOfChannels); //初始化channel的id,返回[0,numberOfChannels)的偽隨機數 nextChannelToSendTo = ThreadLocalRandom.current().nextInt(numberOfChannels); } @Override public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { //循環依次發送到下游的task,比如:nextChannelToSendTo初始值為0,numberOfChannels(下游算子的實例個數,并行度)值為2 //則第一次發送到ID = 1的task,第二次發送到ID = 0的task,第三次發送到ID = 1的task上...依次類推 nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels; return nextChannelToSendTo; } public StreamPartitioner<T> copy() { return this; } @Override public String toString() { return "REBALANCE"; } }
RescalePartitioner
基于上下游Operator的并行度,將記錄以循環的方式輸出到下游Operator的每個實例。
舉例: 上游并行度是2,下游是4,則上游一個并行度以循環的方式將記錄輸出到下游的兩個并行度上;上游另一個并行度以循環的方式將記錄輸出到下游另兩個并行度上。
若上游并行度是4,下游并行度是2,則上游兩個并行度將記錄輸出到下游一個并行度上;上游另兩個并行度將記錄輸出到下游另一個并行度上。
@Internal public class RescalePartitioner<T> extends StreamPartitioner<T> { private static final long serialVersionUID = 1L; private int nextChannelToSendTo = -1; @Override public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { if (++nextChannelToSendTo >= numberOfChannels) { nextChannelToSendTo = 0; } return nextChannelToSendTo; } public StreamPartitioner<T> copy() { return this; } @Override public String toString() { return "RESCALE"; } }
尖叫提示
Flink 中的執行圖可以分成四層:StreamGraph -> JobGraph -> ExecutionGraph -> 物理執行圖。
StreamGraph:是根據用戶通過 Stream API 編寫的代碼生成的最初的圖。用來表示程序的拓撲結構。
JobGraph:StreamGraph經過優化后生成了 JobGraph,提交給 JobManager 的數據結構。主要的優化為,將多個符合條件的節點 chain 在一起作為一個節點,這樣可以減少數據在節點之間流動所需要的序列化/反序列化/傳輸消耗。
ExecutionGraph:JobManager 根據 JobGraph 生成ExecutionGraph。ExecutionGraph是JobGraph的并行化版本,是調度層最核心的數據結構。
物理執行圖:JobManager 根據 ExecutionGraph 對 Job 進行調度后,在各個TaskManager 上部署 Task 后形成的“圖”,并不是一個具體的數據結構。
而StreamingJobGraphGenerator就是StreamGraph轉換為JobGraph。在這個類中,把ForwardPartitioner和RescalePartitioner列為POINTWISE分配模式,其他的為ALL_TO_ALL分配模式。代碼如下:
if (partitioner instanceof ForwardPartitioner || partitioner instanceof RescalePartitioner) { jobEdge = downStreamVertex.connectNewDataSetAsInput( headVertex, // 上游算子(生產端)的實例(subtask)連接下游算子(消費端)的一個或者多個實例(subtask) DistributionPattern.POINTWISE, resultPartitionType); } else { jobEdge = downStreamVertex.connectNewDataSetAsInput( headVertex, // 上游算子(生產端)的實例(subtask)連接下游算子(消費端)的所有實例(subtask) DistributionPattern.ALL_TO_ALL, resultPartitionType); }
ForwardPartitioner
發送到下游對應的第一個task,保證上下游算子并行度一致,即上有算子與下游算子是1:1的關系
/** * 發送到下游對應的第一個task * @param <T> */ @Internal public class ForwardPartitioner<T> extends StreamPartitioner<T> { private static final long serialVersionUID = 1L; @Override public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { return 0; } public StreamPartitioner<T> copy() { return this; } @Override public String toString() { return "FORWARD"; } }
尖叫提示
在上下游的算子沒有指定分區器的情況下,如果上下游的算子并行度一致,則使用ForwardPartitioner,否則使用RebalancePartitioner,對于ForwardPartitioner,必須保證上下游算子并行度一致,否則會拋出異常
//在上下游的算子沒有指定分區器的情況下,如果上下游的算子并行度一致,則使用ForwardPartitioner,否則使用RebalancePartitioner if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) { partitioner = new ForwardPartitioner<Object>(); } else if (partitioner == null) { partitioner = new RebalancePartitioner<Object>(); } if (partitioner instanceof ForwardPartitioner) { //如果上下游的并行度不一致,會拋出異常 if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) { throw new UnsupportedOperationException("Forward partitioning does not allow " + "change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() + ", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() + " You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global."); } }
KeyGroupStreamPartitioner
根據key的分組索引選擇發送到相對應的下游subtask
/** * 根據key的分組索引選擇發送到相對應的下游subtask * @param <T> * @param <K> */ @Internal public class KeyGroupStreamPartitioner<T, K> extends StreamPartitioner<T> implements ConfigurableStreamPartitioner { ... @Override public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { K key; try { key = keySelector.getKey(record.getInstance().getValue()); } catch (Exception e) { throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e); } //調用KeyGroupRangeAssignment類的assignKeyToParallelOperator方法,代碼如下所示 return KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfChannels); } ... }
org.apache.flink.runtime.state.KeyGroupRangeAssignment
public final class KeyGroupRangeAssignment { ... /** * 根據key分配一個并行算子實例的索引,該索引即為該key要發送的下游算子實例的路由信息, * 即該key發送到哪一個task */ public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) { Preconditions.checkNotNull(key, "Assigned key must not be null!"); return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism)); } /** *根據key分配一個分組id(keyGroupId) */ public static int assignToKeyGroup(Object key, int maxParallelism) { Preconditions.checkNotNull(key, "Assigned key must not be null!"); //獲取key的hashcode return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism); } /** * 根據key分配一個分組id(keyGroupId), */ public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) { //與maxParallelism取余,獲取keyGroupId return MathUtils.murmurHash(keyHash) % maxParallelism; } //計算分區index,即該key group應該發送到下游的哪一個算子實例 public static int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) { return keyGroupId * parallelism / maxParallelism; } ...
CustomPartitionerWrapper
通過Partitioner實例的partition方法(自定義的)將記錄輸出到下游。
public class CustomPartitionerWrapper<K, T> extends StreamPartitioner<T> { private static final long serialVersionUID = 1L; Partitioner<K> partitioner; KeySelector<T, K> keySelector; public CustomPartitionerWrapper(Partitioner<K> partitioner, KeySelector<T, K> keySelector) { this.partitioner = partitioner; this.keySelector = keySelector; } @Override public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { K key; try { key = keySelector.getKey(record.getInstance().getValue()); } catch (Exception e) { throw new RuntimeException("Could not extract key from " + record.getInstance(), e); } //實現Partitioner接口,重寫partition方法 return partitioner.partition(key, numberOfChannels); } @Override public StreamPartitioner<T> copy() { return this; } @Override public String toString() { return "CUSTOM"; } }
比如:
public class CustomPartitioner implements Partitioner<String> { // key: 根據key的值來分區 // numPartitions: 下游算子并行度 @Override public int partition(String key, int numPartitions) { return key.length() % numPartitions;//在此處定義分區策略 } }
關于“Flink中分區策略源碼是什么”這篇文章就分享到這里了,希望以上內容可以對大家有一定的幫助,使各位可以學到更多知識,如果覺得文章不錯,請把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。