中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

ShardingContent的功能有哪些

發布時間:2021-09-10 14:12:34 來源:億速云 閱讀:165 作者:柒染 欄目:大數據

這期內容當中小編將會給大家帶來有關ShardingContent的功能有哪些,文章內容豐富且以專業的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

ShardingContent主要做了那些功能呢?主要有兩部分:

  • 數據源分片元數據

        主要根據數據源連接獲取對應的url,通過解析url參數來封裝數據源分片元數據;數據源分片元數據主要后續SQL路由DCL(比如:授權、創建用戶等)操作使用

  • 表分片元數據

        主要根據數據節點來獲取真實表的元數據;而表分片元數據主要后續SQL解析填充使用

源碼分析

1.ShardingContext構造,主要分析ShardingTableMetaData

public  ShardingContext(final Map<String, DataSource> dataSourceMap, final ShardingRule shardingRule, final DatabaseType databaseType, final Properties props) throws SQLException {
        this.shardingRule = shardingRule;
        //獲取數據源原始元數據信息
        this.cachedDatabaseMetaData = createCachedDatabaseMetaData(dataSourceMap);
        //數據源類型
        this.databaseType = databaseType;
        //sharding 配置參數
        //比如:sql打印、線程池大小配置等
        shardingProperties = new ShardingProperties(null == props ? new Properties() : props);
        //Statement、PrepareStatement執行線程池大小
        //一個分片數據源將使用獨立的線程池,它不會在同一個JVM中共享線程池甚至不同的數據源
        //默認無限制
        int executorSize = shardingProperties.getValue(ShardingPropertiesConstant.EXECUTOR_SIZE);
        //執行引擎
        executeEngine = new ShardingExecuteEngine(executorSize);
        //數據源分片元數據
        //以mysql為例,建立連接獲取mysql url,將解析后的url參數信息封裝到ShardingDataSourceMetaData
        ShardingDataSourceMetaData shardingDataSourceMetaData = new ShardingDataSourceMetaData(getDataSourceURLs(dataSourceMap), shardingRule, databaseType);
        //表分片元數據
        //以mysql為例,會建立連接獲取表的元信息(字段、字段類型、索引)
        ShardingTableMetaData shardingTableMetaData = new ShardingTableMetaData(getTableMetaDataInitializer(dataSourceMap, shardingDataSourceMetaData).load(shardingRule));
        //封裝數據源分片元數據、表分片元數據
        metaData = new ShardingMetaData(shardingDataSourceMetaData, shardingTableMetaData);
        //解析結果緩存
        parsingResultCache = new ParsingResultCache();
    }

//
    private TableMetaDataInitializer getTableMetaDataInitializer(final Map<String, DataSource> dataSourceMap, final ShardingDataSourceMetaData shardingDataSourceMetaData) {
        return new TableMetaDataInitializer(shardingDataSourceMetaData, executeEngine, new JDBCTableMetaDataConnectionManager(dataSourceMap),
                shardingProperties.<Integer>getValue(ShardingPropertiesConstant.MAX_CONNECTIONS_SIZE_PER_QUERY),
                shardingProperties.<Boolean>getValue(ShardingPropertiesConstant.CHECK_TABLE_METADATA_ENABLED));
    }

2.加載TableMetaDataInitializer#load

    public TableMetaDataInitializer(final ShardingDataSourceMetaData shardingDataSourceMetaData, final ShardingExecuteEngine executeEngine, 
                                    final TableMetaDataConnectionManager connectionManager, final int maxConnectionsSizePerQuery, final boolean isCheckingMetaData) {
        //數據源分片元數據
        this.shardingDataSourceMetaData = shardingDataSourceMetaData;
        //數據源連接管理器
        this.connectionManager = connectionManager;
        //表元數據加載器
        tableMetaDataLoader = new TableMetaDataLoader(shardingDataSourceMetaData, executeEngine, connectionManager, maxConnectionsSizePerQuery, isCheckingMetaData);
    }

    /**
     * Load table meta data.
     *
     * @param logicTableName logic table name
     * @param shardingRule sharding rule
     * @return table meta data
     */
    @SneakyThrows
    public TableMetaData load(final String logicTableName, final ShardingRule shardingRule) {
        return tableMetaDataLoader.load(logicTableName, shardingRule);
    }
    
    /**
     * Load all table meta data.
     * 
     * @param shardingRule sharding rule
     * @return all table meta data
     */
    @SneakyThrows
    public Map<String, TableMetaData> load(final ShardingRule shardingRule) {
        Map<String, TableMetaData> result = new HashMap<>();
        //加載分片表
        result.putAll(loadShardingTables(shardingRule));
        //加載未分片表
        result.putAll(loadDefaultTables(shardingRule));
        return result;
    }
    
    private Map<String, TableMetaData> loadShardingTables(final ShardingRule shardingRule) throws SQLException {
        Map<String, TableMetaData> result = new HashMap<>(shardingRule.getTableRules().size(), 1);
        for (TableRule each : shardingRule.getTableRules()) {
            //加載邏輯表對應真實表的元數據
            //邏輯表:表元數據
            result.put(each.getLogicTable(), tableMetaDataLoader.load(each.getLogicTable(), shardingRule));
        }
        return result;
    }
    
    private Map<String, TableMetaData> loadDefaultTables(final ShardingRule shardingRule) throws SQLException {
        Map<String, TableMetaData> result = new HashMap<>(shardingRule.getTableRules().size(), 1);
        //查詢默認數據源,沒有則查找主庫
        Optional<String> actualDefaultDataSourceName = shardingRule.findActualDefaultDataSourceName();
        if (actualDefaultDataSourceName.isPresent()) {
            //獲取所有表元數據
            //真實表:表元數據
            for (String each : getAllTableNames(actualDefaultDataSourceName.get())) {
                result.put(each, tableMetaDataLoader.load(each, shardingRule));
            }
        }
        return result;
    }
    
    private Collection<String> getAllTableNames(final String dataSourceName) throws SQLException {
        Collection<String> result = new LinkedHashSet<>();
        DataSourceMetaData dataSourceMetaData = shardingDataSourceMetaData.getActualDataSourceMetaData(dataSourceName);
        String catalog = null == dataSourceMetaData ? null : dataSourceMetaData.getSchemaName();
        try (Connection connection = connectionManager.getConnection(dataSourceName);
             ResultSet resultSet = connection.getMetaData().getTables(catalog, getCurrentSchemaName(connection), null, new String[]{"TABLE"})) {
            while (resultSet.next()) {
                String tableName = resultSet.getString("TABLE_NAME");
                if (!tableName.contains("$") && !tableName.contains("/")) {
                    result.add(tableName);
                }
            }
        }
        return result;
    }
    
    private String getCurrentSchemaName(final Connection connection) throws SQLException {
        try {
            return connection.getSchema();
        } catch (final AbstractMethodError | SQLFeatureNotSupportedException ignore) {
            return null;
        }
    }

3.加載表元數據TableMetaDataLoader#load

    /**
     * Load table meta data.
     *
     * @param logicTableName logic table name
     * @param shardingRule sharding rule
     * @return table meta data
     * @throws SQLException SQL exception
     */
    public TableMetaData load(final String logicTableName, final ShardingRule shardingRule) throws SQLException {
        //獲取表元數據
        List<TableMetaData> actualTableMetaDataList = load(getDataNodeGroups(logicTableName, shardingRule), shardingRule.getShardingDataSourceNames());
        //檢查actualTableMetaDataList的元數據
        checkUniformed(logicTableName, actualTableMetaDataList);
        return actualTableMetaDataList.iterator().next();
    }
    
    private List<TableMetaData> load(final Map<String, List<DataNode>> dataNodeGroups, final ShardingDataSourceNames shardingDataSourceNames) throws SQLException {
        //將封裝好的數據節點組提交給執行引擎執行
        return executeEngine.groupExecute(getDataNodeGroups(dataNodeGroups), new ShardingGroupExecuteCallback<DataNode, TableMetaData>() {
            
            @Override
            public Collection<TableMetaData> execute(final Collection<DataNode> dataNodes, final boolean isTrunkThread, final Map<String, Object> shardingExecuteDataMap) throws SQLException {
                String dataSourceName = dataNodes.iterator().next().getDataSourceName();
                DataSourceMetaData dataSourceMetaData = shardingDataSourceMetaData.getActualDataSourceMetaData(dataSourceName);
                String catalog = null == dataSourceMetaData ? null : dataSourceMetaData.getSchemaName();
                return load(shardingDataSourceNames.getRawMasterDataSourceName(dataSourceName), catalog, dataNodes);
            }
        });
    }
    
    private Collection<TableMetaData> load(final String dataSourceName, final String catalog, final Collection<DataNode> dataNodes) throws SQLException {
        Collection<TableMetaData> result = new LinkedList<>();
        try (Connection connection = connectionManager.getConnection(dataSourceName)) {
            for (DataNode each : dataNodes) {
                //獲取表元數據
                result.add(createTableMetaData(connection, catalog, each.getTableName()));
            }
        }
        return result;
    }
    
    private Map<String, List<DataNode>> getDataNodeGroups(final String logicTableName, final ShardingRule shardingRule) {
        //根據邏輯表獲取對應的數據源:真實表數據節點
        //比如:
        //ds_0 -> [ds_0:t_order_0, ds_0:t_order_1]
        //ds_1 -> [ds_1.t_order_0, ds_1.t_order_1]
        Map<String, List<DataNode>> result = shardingRule.getTableRule(logicTableName).getDataNodeGroups();
        //默認false,設置為true會處理所有數據節點真實表
        if (isCheckingMetaData) {
            return result;
        }
        //返回一個數據節點即可
        String firstKey = result.keySet().iterator().next();
        return Collections.singletonMap(firstKey, Collections.singletonList(result.get(firstKey).get(0)));
    }

    /**
     * 將數據節點組封裝成分片執行組
     *
     * @param dataNodeGroups 數據節點組
     * <pre>
     *      ds_0 -> [ds_0:t_order_0, ds_0:t_order_1]
     * </pre>
     * @return
     */
    private Collection<ShardingExecuteGroup<DataNode>> getDataNodeGroups(final Map<String, List<DataNode>> dataNodeGroups) {
        Collection<ShardingExecuteGroup<DataNode>> result = new LinkedList<>();
        //遍歷對應數據源下的數據節點
        for (Entry<String, List<DataNode>> entry : dataNodeGroups.entrySet()) {
            //封裝分片執行組ShardingExecuteGroup
            result.addAll(getDataNodeGroups(entry.getValue()));
        }
        return result;
    }
    
    private Collection<ShardingExecuteGroup<DataNode>> getDataNodeGroups(final List<DataNode> dataNodes) {
        Collection<ShardingExecuteGroup<DataNode>> result = new LinkedList<>();
        //maxConnectionsSizePerQuery最大查詢連接數默認為1
        //將dataNodes換分Math.max份
        for (List<DataNode> each : Lists.partition(dataNodes, Math.max(dataNodes.size() / maxConnectionsSizePerQuery, 1))) {
            result.add(new ShardingExecuteGroup<>(each));
        }
        return result;
    }
    
    private TableMetaData createTableMetaData(final Connection connection, final String catalog, final String actualTableName) throws SQLException {
        //判斷表是否存在
        if (isTableExist(connection, catalog, actualTableName)) {
            //封裝表元數據
            return new TableMetaData(getColumnMetaDataList(connection, catalog, actualTableName), getLogicIndexes(connection, catalog, actualTableName));
        }
        return new TableMetaData(Collections.<ColumnMetaData>emptyList(), Collections.<String>emptySet());
    }
    
    private boolean isTableExist(final Connection connection, final String catalog, final String actualTableName) throws SQLException {
        try (ResultSet resultSet = connection.getMetaData().getTables(catalog, null, actualTableName, null)) {
            return resultSet.next();
        }
    }

    /**
     * 獲取表字段元數據
     *
     * @param connection 連接
     * @param catalog schema
     * @param actualTableName 真實表
     * @return
     * @throws SQLException
     */
    private List<ColumnMetaData> getColumnMetaDataList(final Connection connection, final String catalog, final String actualTableName) throws SQLException {
        List<ColumnMetaData> result = new LinkedList<>();
        Collection<String> primaryKeys = getPrimaryKeys(connection, catalog, actualTableName);
        try (ResultSet resultSet = connection.getMetaData().getColumns(catalog, null, actualTableName, "%")) {
            while (resultSet.next()) {
                String columnName = resultSet.getString("COLUMN_NAME");
                String columnType = resultSet.getString("TYPE_NAME");
                result.add(new ColumnMetaData(columnName, columnType, primaryKeys.contains(columnName)));
            }
        }
        return result;
    }

    /**
     * 獲取表主鍵
     */
    private Collection<String> getPrimaryKeys(final Connection connection, final String catalog, final String actualTableName) throws SQLException {
        Collection<String> result = new HashSet<>();
        try (ResultSet resultSet = connection.getMetaData().getPrimaryKeys(catalog, null, actualTableName)) {
            while (resultSet.next()) {
                result.add(resultSet.getString("COLUMN_NAME"));
            }
        }
        return result;
    }

    /**
     * 獲取表索引
     */
    private Collection<String> getLogicIndexes(final Connection connection, final String catalog, final String actualTableName) throws SQLException {
        Collection<String> result = new HashSet<>();
        try (ResultSet resultSet = connection.getMetaData().getIndexInfo(catalog, catalog, actualTableName, false, false)) {
            while (resultSet.next()) {
                Optional<String> logicIndex = getLogicIndex(resultSet.getString("INDEX_NAME"), actualTableName);
                if (logicIndex.isPresent()) {
                    result.add(logicIndex.get());
                }
            }
        }
        return result;
    }
    
    private Optional<String> getLogicIndex(final String actualIndexName, final String actualTableName) {
        //索引要以`_tableName`命名,比如:
        //idx_t_order
        String indexNameSuffix = "_" + actualTableName;
        if (actualIndexName.contains(indexNameSuffix)) {
            return Optional.of(actualIndexName.replace(indexNameSuffix, ""));
        }
        return Optional.absent();
    }

4.執行ShardingExecuteEngine#groupExecute

    /**
     * Execute for group.
     *
     * @param inputGroups input groups
     * @param callback sharding execute callback
     * @param <I> type of input value
     * @param <O> type of return value
     * @return execute result
     * @throws SQLException throw if execute failure
     */
    public <I, O> List<O> groupExecute(final Collection<ShardingExecuteGroup<I>> inputGroups, final ShardingGroupExecuteCallback<I, O> callback) throws SQLException {
        return groupExecute(inputGroups, null, callback, false);
    }
    
    /**
     * Execute for group.
     *
     * @param inputGroups input groups
     * @param firstCallback first sharding execute callback
     * @param callback sharding execute callback
     * @param serial whether using multi thread execute or not
     * @param <I> type of input value
     * @param <O> type of return value
     * @return execute result
     * @throws SQLException throw if execute failure
     */
    public <I, O> List<O> groupExecute(
        final Collection<ShardingExecuteGroup<I>> inputGroups, final ShardingGroupExecuteCallback<I, O> firstCallback, final ShardingGroupExecuteCallback<I, O> callback, final boolean serial)
        throws SQLException {
        if (inputGroups.isEmpty()) {
            return Collections.emptyList();
        }
        //serial: 串行
        //parallel: 并行
        return serial ? serialExecute(inputGroups, firstCallback, callback) : parallelExecute(inputGroups, firstCallback, callback);
    }
    
    private <I, O> List<O> serialExecute(final Collection<ShardingExecuteGroup<I>> inputGroups, final ShardingGroupExecuteCallback<I, O> firstCallback,
                                         final ShardingGroupExecuteCallback<I, O> callback) throws SQLException {
        Iterator<ShardingExecuteGroup<I>> inputGroupsIterator = inputGroups.iterator();
        ShardingExecuteGroup<I> firstInputs = inputGroupsIterator.next();
        //單獨執行第一個組
        //當firstCallback不為空時使用firstCallback,否則使用callback
        List<O> result = new LinkedList<>(syncGroupExecute(firstInputs, null == firstCallback ? callback : firstCallback));
        //遍歷執行
        for (ShardingExecuteGroup<I> each : Lists.newArrayList(inputGroupsIterator)) {
            result.addAll(syncGroupExecute(each, callback));
        }
        return result;
    }
    
    private <I, O> List<O> parallelExecute(final Collection<ShardingExecuteGroup<I>> inputGroups, final ShardingGroupExecuteCallback<I, O> firstCallback,
                                           final ShardingGroupExecuteCallback<I, O> callback) throws SQLException {
        Iterator<ShardingExecuteGroup<I>> inputGroupsIterator = inputGroups.iterator();
        //獲取第一個組
        ShardingExecuteGroup<I> firstInputs = inputGroupsIterator.next();
        //將剩余組提交到線程池中執行
        Collection<ListenableFuture<Collection<O>>> restResultFutures = asyncGroupExecute(Lists.newArrayList(inputGroupsIterator), callback);
        //執行第一個組,合并同步執行、異步執行結果
        return getGroupResults(syncGroupExecute(firstInputs, null == firstCallback ? callback : firstCallback), restResultFutures);
    }

    /**
     * 異步執行
     */
    private <I, O> Collection<ListenableFuture<Collection<O>>> asyncGroupExecute(final List<ShardingExecuteGroup<I>> inputGroups, final ShardingGroupExecuteCallback<I, O> callback) {
        Collection<ListenableFuture<Collection<O>>> result = new LinkedList<>();
        for (ShardingExecuteGroup<I> each : inputGroups) {
            result.add(asyncGroupExecute(each, callback));
        }
        return result;
    }
    
    private <I, O> ListenableFuture<Collection<O>> asyncGroupExecute(final ShardingExecuteGroup<I> inputGroup, final ShardingGroupExecuteCallback<I, O> callback) {
        final Map<String, Object> dataMap = ShardingExecuteDataMap.getDataMap();
        //提交到線程池
        return executorService.submit(new Callable<Collection<O>>() {
            
            @Override
            public Collection<O> call() throws SQLException {
                return callback.execute(inputGroup.getInputs(), false, dataMap);
            }
        });
    }

    /**
     * 同步執行
     */
    private <I, O> Collection<O> syncGroupExecute(final ShardingExecuteGroup<I> executeGroup, final ShardingGroupExecuteCallback<I, O> callback) throws SQLException {
        return callback.execute(executeGroup.getInputs(), true, ShardingExecuteDataMap.getDataMap());
    }
    
    private <O> List<O> getGroupResults(final Collection<O> firstResults, final Collection<ListenableFuture<Collection<O>>> restFutures) throws SQLException {
        List<O> result = new LinkedList<>(firstResults);
        for (ListenableFuture<Collection<O>> each : restFutures) {
            try {
                result.addAll(each.get());
            } catch (final InterruptedException | ExecutionException ex) {
                return throwException(ex);
            }
        }
        return result;
    }

上述就是小編為大家分享的ShardingContent的功能有哪些了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關知識,歡迎關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

桂平市| 博兴县| 柞水县| 金溪县| 温泉县| 峨眉山市| 铜川市| 兴义市| 岑溪市| 太原市| 苗栗市| 荣成市| 尼勒克县| 芜湖县| 三台县| 阿荣旗| 漳平市| 平塘县| 白河县| 乐清市| 柘荣县| 福安市| 昌图县| 长汀县| 调兵山市| 宜丰县| 洛阳市| 嘉定区| 新闻| 祥云县| 浦江县| 威宁| 陆河县| 南平市| 潜山县| 乌恰县| 慈利县| 鄯善县| 高青县| 都安| 五华县|