中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

sharding中怎么執行jdbc

發布時間:2021-07-19 15:04:25 來源:億速云 閱讀:127 作者:Leah 欄目:大數據

sharding中怎么執行jdbc,很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細講解,有這方面需求的人可以來學習下,希望你能有所收獲。

  • 內存限制模式:使用此模式的前提是,ShardingSphere對一次操作所耗費的數據庫連接數量不做限制。如果實際執行的SQL需要對某數據庫實例中的200張表做操作,則對每張表創建一個新的數據庫連接,并通過多線程的方式并發處理,以達成執行效率最大化。并且在SQL滿足條件情況下,優先選擇流式歸并,以防止出現內存溢出或避免頻繁垃圾回收情況

  • 連接限制模式:使用此模式的前提是,ShardingSphere嚴格控制對一次操作所耗費的數據庫連接數量。如果實際執行的SQL需要對某數據庫實例中的200張表做操作,那么只會創建唯一的數據庫連接,并對其200張表串行處理。如果一次操作中的分片散落在不同的數據庫,仍然采用多線程處理對不同庫的操作,但每個庫的每次操作仍然只創建一個唯一的數據庫連接。這樣即可以防止對一次請求對數據庫連接占用過多所帶來的問題。該模式始終選擇內存歸并

case: 本文主要以SELECT i.* FROM t_order o, t_order_item i WHERE o.order_id = i.order_id and o.order_id = 2 and o.user_id = 2一個簡單查詢語句,來分析ss大致如何來執行sql,根據分片改寫后的sql,應該是demo_ds_slave_0:SELECT * FROM t_order_0 i, t_order_item_0 o WHERE o.order_id = i.order_id and o.order_id = 2 and o.user_id = 2 來執行

準備階段

1.初始化PreparedStatementExecutor#init,封裝Statement執行單元

public final class PreparedStatementExecutor extends AbstractStatementExecutor {

    @Getter
    private final boolean returnGeneratedKeys;

    public PreparedStatementExecutor(
            final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability, final boolean returnGeneratedKeys, final ShardingConnection shardingConnection) {
        super(resultSetType, resultSetConcurrency, resultSetHoldability, shardingConnection);
        this.returnGeneratedKeys = returnGeneratedKeys;
    }

    /**
     * Initialize executor.
     *
     * @param routeResult route result
     * @throws SQLException SQL exception
     */
    public void init(final SQLRouteResult routeResult) throws SQLException {
        setSqlStatement(routeResult.getOptimizedStatement().getSQLStatement());
        //添加路由單元,即數據源對應的sql單元
        getExecuteGroups().addAll(obtainExecuteGroups(routeResult.getRouteUnits()));
        //緩存statement、參數
        cacheStatements();
    }

    private Collection<ShardingExecuteGroup<StatementExecuteUnit>> obtainExecuteGroups(final Collection<RouteUnit> routeUnits) throws SQLException {
        //執行封裝Statement執行單元
        return getSqlExecutePrepareTemplate().getExecuteUnitGroups(routeUnits, new SQLExecutePrepareCallback() {

            @Override
            public List<Connection> getConnections(final ConnectionMode connectionMode, final String dataSourceName, final int connectionSize) throws SQLException {
                return PreparedStatementExecutor.super.getConnection().getConnections(connectionMode, dataSourceName, connectionSize);
            }

            @Override
            public StatementExecuteUnit createStatementExecuteUnit(final Connection connection, final RouteUnit routeUnit, final ConnectionMode connectionMode) throws SQLException {
                return new StatementExecuteUnit(routeUnit, createPreparedStatement(connection, routeUnit.getSqlUnit().getSql()), connectionMode);
            }
        });
    }

    @SuppressWarnings("MagicConstant")
    private PreparedStatement createPreparedStatement(final Connection connection, final String sql) throws SQLException {
        return returnGeneratedKeys ? connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)
                : connection.prepareStatement(sql, getResultSetType(), getResultSetConcurrency(), getResultSetHoldability());
    }

   ... ...   
}

2.執行封裝Statement執行單元getSqlExecutePrepareTemplate().getExecuteUnitGroups

@RequiredArgsConstructor
public final class SQLExecutePrepareTemplate {

    private final int maxConnectionsSizePerQuery;

    /**
     * Get execute unit groups.
     *
     * @param routeUnits route units
     * @param callback SQL execute prepare callback
     * @return statement execute unit groups
     * @throws SQLException SQL exception
     */
    public Collection<ShardingExecuteGroup<StatementExecuteUnit>> getExecuteUnitGroups(final Collection<RouteUnit> routeUnits, final SQLExecutePrepareCallback callback) throws SQLException {
        return getSynchronizedExecuteUnitGroups(routeUnits, callback);
    }

    private Collection<ShardingExecuteGroup<StatementExecuteUnit>> getSynchronizedExecuteUnitGroups(
            final Collection<RouteUnit> routeUnits, final SQLExecutePrepareCallback callback) throws SQLException {
        //數據源對應sql單元集合,即demo_ds_0:[select i.* from t_order_0 i, t_order_item_0 o where i.order_id = o.order_id and i.order_id = ?]
        Map<String, List<SQLUnit>> sqlUnitGroups = getSQLUnitGroups(routeUnits);
        Collection<ShardingExecuteGroup<StatementExecuteUnit>> result = new LinkedList<>();

        for (Entry<String, List<SQLUnit>> entry : sqlUnitGroups.entrySet()) {
            //添加分片執行組
            result.addAll(getSQLExecuteGroups(entry.getKey(), entry.getValue(), callback));
        }
        return result;
    }

    private Map<String, List<SQLUnit>> getSQLUnitGroups(final Collection<RouteUnit> routeUnits) {
        Map<String, List<SQLUnit>> result = new LinkedHashMap<>(routeUnits.size(), 1);
        for (RouteUnit each : routeUnits) {
            if (!result.containsKey(each.getDataSourceName())) {
                result.put(each.getDataSourceName(), new LinkedList<SQLUnit>());
            }
            result.get(each.getDataSourceName()).add(each.getSqlUnit());
        }
        return result;
    }

    private List<ShardingExecuteGroup<StatementExecuteUnit>> getSQLExecuteGroups(
            final String dataSourceName, final List<SQLUnit> sqlUnits, final SQLExecutePrepareCallback callback) throws SQLException {
        List<ShardingExecuteGroup<StatementExecuteUnit>> result = new LinkedList<>();
        //在maxConnectionSizePerQuery允許的范圍內,當一個連接需要執行的請求數量大于1時,意味著當前的數據庫連接無法持有相應的數據結果集,則必須采用內存歸并;
        //反之,當一個連接需要執行的請求數量等于1時,意味著當前的數據庫連接可以持有相應的數據結果集,則可以采用流式歸并

        //TODO 場景:在不分庫只分表的情況下,會存在一個數據源對應多個sql單元的情況

        //計算所需要的分區大小
        int desiredPartitionSize = Math.max(0 == sqlUnits.size() % maxConnectionsSizePerQuery ? sqlUnits.size() / maxConnectionsSizePerQuery : sqlUnits.size() / maxConnectionsSizePerQuery + 1, 1);
        //按照分區大小進行分區
        //事例:
        //sqlUnits = [1, 2, 3, 4, 5]
        //desiredPartitionSize = 2
        //則結果為:[[1, 2], [3,4], [5]]
        List<List<SQLUnit>> sqlUnitPartitions = Lists.partition(sqlUnits, desiredPartitionSize);
        //maxConnectionsSizePerQuery該參數表示一次查詢時每個數據庫所允許使用的最大連接數
        //根據maxConnectionsSizePerQuery來判斷使用連接模式
        //CONNECTION_STRICTLY 連接限制模式
        //MEMORY_STRICTLY 內存限制模式
        ConnectionMode connectionMode = maxConnectionsSizePerQuery < sqlUnits.size() ? ConnectionMode.CONNECTION_STRICTLY : ConnectionMode.MEMORY_STRICTLY;
        //獲取分區大小的連接
        List<Connection> connections = callback.getConnections(connectionMode, dataSourceName, sqlUnitPartitions.size());
        int count = 0;
        //遍歷分區,將分區好的sql單元放到指定連接執行
        for (List<SQLUnit> each : sqlUnitPartitions) {
            result.add(getSQLExecuteGroup(connectionMode, connections.get(count++), dataSourceName, each, callback));
        }
        return result;
    }

    private ShardingExecuteGroup<StatementExecuteUnit> getSQLExecuteGroup(final ConnectionMode connectionMode, final Connection connection, 
                                                                          final String dataSourceName, final List<SQLUnit> sqlUnitGroup, final SQLExecutePrepareCallback callback) throws SQLException {
        List<StatementExecuteUnit> result = new LinkedList<>();
        //遍歷sql單元
        for (SQLUnit each : sqlUnitGroup) {
            //回調,創建statement執行單元
            result.add(callback.createStatementExecuteUnit(connection, new RouteUnit(dataSourceName, each), connectionMode));
        }
        //封裝成分片執行組
        return new ShardingExecuteGroup<>(result);
    }
}
執行階段

1.執行查詢sql

public final class PreparedStatementExecutor extends AbstractStatementExecutor {

    ... ...

    /**
     * Execute query.
     *
     * @return result set list
     * @throws SQLException SQL exception
     */
    public List<QueryResult> executeQuery() throws SQLException {
        //獲取當前是否異常值
        final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
        //創建回調實例
        //執行SQLExecuteCallback的execute方法
        SQLExecuteCallback<QueryResult> executeCallback = new SQLExecuteCallback<QueryResult>(getDatabaseType(), isExceptionThrown) {

            @Override
            protected QueryResult executeSQL(final RouteUnit routeUnit, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
                return getQueryResult(statement, connectionMode);
            }
        };
        return executeCallback(executeCallback);
    }

    ... ...

   protected final <T> List<T> executeCallback(final SQLExecuteCallback<T> executeCallback) throws SQLException {
        List<T> result = sqlExecuteTemplate.executeGroup((Collection) executeGroups, executeCallback);
        //執行完后刷新分片元數據,比如創建表、修改表etc.
        refreshShardingMetaDataIfNeeded(connection.getShardingContext(), sqlStatement);
        return result;
    }

    ... ...
}

2.通過線程池分組執行,并回調callback

@RequiredArgsConstructor
public abstract class SQLExecuteCallback<T> implements ShardingGroupExecuteCallback<StatementExecuteUnit, T> {
    //數據庫類型
    private final DatabaseType databaseType;
    //是否異常
    private final boolean isExceptionThrown;

    @Override
    public final Collection<T> execute(final Collection<StatementExecuteUnit> statementExecuteUnits, final boolean isTrunkThread,
                                       final Map<String, Object> shardingExecuteDataMap) throws SQLException {
        Collection<T> result = new LinkedList<>();
        //遍歷statement執行單元
        for (StatementExecuteUnit each : statementExecuteUnits) {
            //執行添加返回結果T
            result.add(execute0(each, isTrunkThread, shardingExecuteDataMap));
        }
        return result;
    }

    private T execute0(final StatementExecuteUnit statementExecuteUnit, final boolean isTrunkThread, final Map<String, Object> shardingExecuteDataMap) throws SQLException {
        //設置當前線程是否異常
        ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);
        //根據url獲取數據源元數據
        DataSourceMetaData dataSourceMetaData = databaseType.getDataSourceMetaData(statementExecuteUnit.getStatement().getConnection().getMetaData().getURL());
        //sql執行鉤子
        SQLExecutionHook sqlExecutionHook = new SPISQLExecutionHook();
        try {
            sqlExecutionHook.start(statementExecuteUnit.getRouteUnit(), dataSourceMetaData, isTrunkThread, shardingExecuteDataMap);
            //執行sql
            T result = executeSQL(statementExecuteUnit.getRouteUnit(), statementExecuteUnit.getStatement(), statementExecuteUnit.getConnectionMode());
            sqlExecutionHook.finishSuccess();
            return result;
        } catch (final SQLException ex) {
            sqlExecutionHook.finishFailure(ex);
            ExecutorExceptionHandler.handleException(ex);
            return null;
        }
    }

    protected abstract T executeSQL(RouteUnit routeUnit, Statement statement, ConnectionMode connectionMode) throws SQLException;
}

3.執行executeSQL,調用第三步的callback中的executeSQL,封裝ResultSet

public final class PreparedStatementExecutor extends AbstractStatementExecutor {

    ... ...

    private QueryResult getQueryResult(final Statement statement, final ConnectionMode connectionMode) throws SQLException {
        PreparedStatement preparedStatement = (PreparedStatement) statement;
        ResultSet resultSet = preparedStatement.executeQuery();
        ShardingRule shardingRule = getConnection().getShardingContext().getShardingRule();
        //緩存resultSet
        getResultSets().add(resultSet);
        //判斷ConnectionMode
        //如果是MEMORY_STRICTLY,使用流式StreamQueryResult;否則使用內存MemoryQueryResult
        return ConnectionMode.MEMORY_STRICTLY == connectionMode ? new StreamQueryResult(resultSet, shardingRule) 
                : new MemoryQueryResult(resultSet, shardingRule);
    }

    ... ...
}

看完上述內容是否對您有幫助呢?如果還想對相關知識有進一步的了解或閱讀更多相關文章,請關注億速云行業資訊頻道,感謝您對億速云的支持。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

常熟市| 孝昌县| 桐城市| 亚东县| 石狮市| 达拉特旗| 和静县| 桐乡市| 云阳县| 宁陕县| 金秀| 和平区| 三亚市| 东源县| 普定县| 大石桥市| 建平县| 永登县| 长宁区| 肥乡县| 原阳县| 临汾市| 阿合奇县| 罗江县| 乃东县| 苍南县| 新蔡县| 高州市| 沛县| 黎城县| 平和县| 龙岩市| 大连市| 澳门| 清流县| 华安县| 兴海县| 惠州市| 南昌县| 晴隆县| 满洲里市|