中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

RocketMQ中怎么實現權限控制

發布時間:2021-08-02 17:13:43 來源:億速云 閱讀:486 作者:Leah 欄目:大數據

這篇文章將為大家詳細講解有關RocketMQ中怎么實現權限控制,文章內容質量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。

1、簡單使用

1.1、ACL是什么

ACL是access control list的簡稱,俗稱訪問控制列表。訪問控制,基本上會涉及到用戶、資源、權限、角色等概念,那在RocketMQ中上述會對應哪些對象呢?

用戶:用戶是訪問控制的基礎要素,RocketMQ ACL必然也會引入用戶的概念,即支持用戶名、密碼。 資源:需要保護的對象,消息發送涉及的Topic、消息消費涉及的消費組,應該進行保護,故可以抽象成資源。 權限:針對資源,能進行的操作。 角色:RocketMQ中,只定義兩種角色:是否是管理員。

1.2、RocketMQ中配置ACL

acl默認的配置文件名:plain_acl.yml,需要放在${ROCKETMQ_HOME}/store/config目錄下

需要使用acl必須在服務端開啟此功能,在Broker的配置文件中配置,aclEnable = true開啟此功能

配置plain_acl.yml文件

globalWhiteRemoteAddresses:
- 10.10.15.*
- 192.168.0.*

accounts:
- accessKey: RocketMQ
  secretKey: 12345678
  whiteRemoteAddress:
  admin: false
  defaultTopicPerm: DENY
  defaultGroupPerm: SUB
  topicPerms:
  - topicA=DENY
  - topicB=PUB|SUB
  - topicC=SUB
  groupPerms:
  # the group should convert to retry topic
  - groupA=DENY
  - groupB=PUB|SUB
  - groupC=SUB

- accessKey: rocketmq2
  secretKey: 12345678
  whiteRemoteAddress: 192.168.1.*
  # if it is admin, it could access all resources
  admin: true

下面我們介紹一下plain_acl.yml文件中相關的參數含義及使用

字段取值含義
globalWhiteRemoteAddresses*;192.168.*.*;192.168.0.1全局IP白名單
accessKey字符串Access Key 用戶名
secretKey字符串Secret Key 密碼
whiteRemoteAddress*;192.168.*.*;192.168.0.1用戶IP白名單
admintrue;false是否管理員賬戶
defaultTopicPermDENY;PUB;SUB;PUB|SUB默認的Topic權限
defaultGroupPermDENY;PUB;SUB;PUB|SUB默認的ConsumerGroup權限
topicPermstopic=權限各個Topic的權限
groupPermsgroup=權限各個ConsumerGroup的權限

權限標識符的含義

權限含義
DENY拒絕
ANYPUB 或者 SUB 權限
PUB發送權限
SUB訂閱權限

處理流程

RocketMQ中怎么實現權限控制

特殊的請求例如 UPDATE_AND_CREATE_TOPIC 等,只能由 admin 賬戶進行操作;

對于某個資源,如果有顯性配置權限,則采用配置的權限;如果沒有顯性配置權限,則采用默認的權限

RocketMQ的權限控制存儲的默認實現是基于yml配置文件。用戶可以動態修改權限控制定義的屬性,而不需重新啟動Broker服務節點

如果ACL與高可用部署(Master/Slave架構)同時啟用,那么需要在Broker Master節點的${ROCKETMQ_HOME}/store/conf/plain_acl.yml配置文件中 設置全局白名單信息,即為將Slave節點的ip地址設置至Master節點plain_acl.yml配置文件的全局白名單中

1.3、代碼示例

1.3.1、生產者代碼

public class AclProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name", getAclRPCHook());
        producer.setNamesrvAddr("10.10.15.246:9876;10.10.15.247:9876");
        producer.start();
        for (int i = 0; i < 10; i++) {
            try {
                Message msg = new Message("topicA" ,"TagA" , ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        producer.shutdown();
    }

    static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("RocketMQ","12345678"));
    }
}

查看結果

RocketMQ中怎么實現權限控制

報錯提示topicA沒有權限,我們在plain_acl.yml文件中配置的也確實是RocketMQ用戶拒絕,生產消費topicA主題信息,我們改變主題為topicB,則發現發送消息成功,topicB=PUB|SUB設置的權限是生產消費都可以。

查看結果

RocketMQ中怎么實現權限控制

1.3.2、消費者代碼

public class AclConsumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("groupA", getAclRPCHook(),new AllocateMessageQueueAveragely());
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("topicB", "*");
        consumer.setNamesrvAddr("10.10.15.246:9876;10.10.15.247:9876");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }

    static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("RocketMQ","12345678"));
    }
}

查看結果:發現沒有任何消息被消費,也沒有報錯信息,對于RocketMQ用戶topicB設置的就是可以可以生產可以消費的,但是我們發現其groupA=DENY是拒絕的,說明消費組是groupA則拒絕消費任何消息,我們改成groupB或者groupC查看結果。

RocketMQ中怎么實現權限控制

2、源碼分析

Broker端ACL原理圖

RocketMQ中怎么實現權限控制

2.1、Broker初始化時ACL相關操作

Broker服務啟動時創建BrokerController并初始化initialize()時調用acl相關的初始化方法initialAcl()

private void initialAcl() {
	//broker配置文件中是否開啟ACL功能,默認關閉
    if (!this.brokerConfig.isAclEnable()) {
        log.info("The broker dose not enable acl");
        return;
    }
    //獲取權限訪問校驗器的列表,加載的META-INF/service/org.apache.rocketmq.acl.AccessValidator文件中指向
    //org.apache.rocketmq.acl.plain.PlainAccessValidator,默認只有一個
    List<AccessValidator> accessValidators = ServiceProvider.load(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class);
    if (accessValidators == null || accessValidators.isEmpty()) {
        log.info("The broker dose not load the AccessValidator");
        return;
    }
    for (AccessValidator accessValidator: accessValidators) {
        final AccessValidator validator = accessValidator;
        //注冊服務端就的“鉤子”對象,對權限進行校驗
        this.registerServerRPCHook(new RPCHook() {
            @Override
            public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
                //Do not catch the exception
                validator.validate(validator.parse(request, remoteAddr));
            }
            @Override
            public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {
            }
        });
    }
}

源碼中有相關的注解,我們查看一下注冊registerServerRPCHook方法

public void registerServerRPCHook(RPCHook rpcHook) {
	//服務端的NettyRemotingServer服務注冊“鉤子”函數
    getRemotingServer().registerRPCHook(rpcHook);
    this.fastRemotingServer.registerRPCHook(rpcHook);
}

關于NettyRemotingServer服務和NettyRemotingClient服務配合使用,后面章節RocketMQ Remoting會重點分析

2.2、 PlainAccessValidator權限驗證器

PlainAccessValidator.parse(),根據客戶端不同的請求Code其需要的檢驗資源也不一樣

switch (request.getCode()) {
	//發送消息需要校驗當前的賬戶的topic是否具有PUB權限
    case RequestCode.SEND_MESSAGE:
        accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.PUB);
        break;
    case RequestCode.SEND_MESSAGE_V2:
        accessResource.addResourceAndPerm(request.getExtFields().get("b"), Permission.PUB);
        break;
    case RequestCode.CONSUMER_SEND_MSG_BACK:
        accessResource.addResourceAndPerm(request.getExtFields().get("originTopic"), Permission.PUB);
        accessResource.addResourceAndPerm(getRetryTopic(request.getExtFields().get("group")), Permission.SUB);
        break;
    //拉取消息時需要知道該consumer賬戶下拉取的topic是否具有SUB權限,并且還要知道訂閱組consumerGroup是否有sub權限
    case RequestCode.PULL_MESSAGE:
        accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.SUB);
        accessResource.addResourceAndPerm(getRetryTopic(request.getExtFields().get("consumerGroup")), Permission.SUB);
        break;
    case RequestCode.QUERY_MESSAGE:
        accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.SUB);
        break;
    case RequestCode.HEART_BEAT:
        HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
        for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
            accessResource.addResourceAndPerm(getRetryTopic(data.getGroupName()), Permission.SUB);
            for (SubscriptionData subscriptionData : data.getSubscriptionDataSet()) {
                accessResource.addResourceAndPerm(subscriptionData.getTopic(), Permission.SUB);
            }
        }
        break;
    case RequestCode.UNREGISTER_CLIENT:
        final UnregisterClientRequestHeader unregisterClientRequestHeader =
            (UnregisterClientRequestHeader) request
                .decodeCommandCustomHeader(UnregisterClientRequestHeader.class);
        accessResource.addResourceAndPerm(getRetryTopic(unregisterClientRequestHeader.getConsumerGroup()), Permission.SUB);
        break;
    case RequestCode.GET_CONSUMER_LIST_BY_GROUP:
        final GetConsumerListByGroupRequestHeader getConsumerListByGroupRequestHeader =
            (GetConsumerListByGroupRequestHeader) request
                .decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class);
        accessResource.addResourceAndPerm(getRetryTopic(getConsumerListByGroupRequestHeader.getConsumerGroup()), Permission.SUB);
        break;
    case RequestCode.UPDATE_CONSUMER_OFFSET:
        final UpdateConsumerOffsetRequestHeader updateConsumerOffsetRequestHeader =
            (UpdateConsumerOffsetRequestHeader) request
                .decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);
        accessResource.addResourceAndPerm(getRetryTopic(updateConsumerOffsetRequestHeader.getConsumerGroup()), Permission.SUB);
        accessResource.addResourceAndPerm(updateConsumerOffsetRequestHeader.getTopic(), Permission.SUB);
        break;
    default:
        break;

}

根據request.getCode()獲取當前的操作需要的權限標識集合,供后面與系統的權限配置文件plain_acl.yml中的權限標識符校驗時使用

2.3、PlainPermissionLoader資源加載器

Broker初始化相關服務的時候創建了PlainAccessValidator,我們發現其默認的構造方法中調用了其權限資源加載器PlainPermissionLoader

public PlainAccessValidator() {
    aclPlugEngine = new PlainPermissionLoader();
}

創建PlainPermissionLoader對象

public PlainPermissionLoader() {
	//加載服務端的權限文件plain_acl.yml
    load();
    //開啟線程每500ms檢測權限文件是否改變,若改變則執行load()從新加載權限文件
    watch();
}

查看load方法流程

public void load() {
    Map<String, PlainAccessResource> plainAccessResourceMap = new HashMap<>();
    List<RemoteAddressStrategy> globalWhiteRemoteAddressStrategy = new ArrayList<>();

    JSONObject plainAclConfData = AclUtils.getYamlDataObject(fileHome + File.separator + fileName,
        JSONObject.class);

    if (plainAclConfData == null || plainAclConfData.isEmpty()) {
        throw new AclException(String.format("%s file  is not data", fileHome + File.separator + fileName));
    }
    log.info("Broker plain acl conf data is : ", plainAclConfData.toString());
    //獲取全局白名單IP集合
    JSONArray globalWhiteRemoteAddressesList = plainAclConfData.getJSONArray("globalWhiteRemoteAddresses");
    if (globalWhiteRemoteAddressesList != null && !globalWhiteRemoteAddressesList.isEmpty()) {
        for (int i = 0; i < globalWhiteRemoteAddressesList.size(); i++) {
            globalWhiteRemoteAddressStrategy.add(remoteAddressStrategyFactory.
                    getRemoteAddressStrategy(globalWhiteRemoteAddressesList.getString(i)));
        }
    }
    //獲取賬戶權限集合
    JSONArray accounts = plainAclConfData.getJSONArray("accounts");
    if (accounts != null && !accounts.isEmpty()) {
        List<PlainAccessConfig> plainAccessConfigList = accounts.toJavaList(PlainAccessConfig.class);
        for (PlainAccessConfig plainAccessConfig : plainAccessConfigList) {
        	//構建每個賬戶的權限資源
            PlainAccessResource plainAccessResource = buildPlainAccessResource(plainAccessConfig);
            //放入Map中AccessKey作為key,該賬戶的權限資源作為value
            plainAccessResourceMap.put(plainAccessResource.getAccessKey(),plainAccessResource);
        }
    }
    this.globalWhiteRemoteAddressStrategy = globalWhiteRemoteAddressStrategy;
    this.plainAccessResourceMap = plainAccessResourceMap;
}

加載資源文件,解析其中的權限標識,等待權限校驗器PlainAccessValidator調用其validate()對權限校驗

2.4、權限校驗流程

核心的校驗方法PlainPermissionLoader.validate()

public void validate(PlainAccessResource plainAccessResource) {

    //全局的白名單IP進行校驗
    for (RemoteAddressStrategy remoteAddressStrategy : globalWhiteRemoteAddressStrategy) {
        //匹配成功說明是全局的白名單IP,具有所有權限,直接返回。
    	if (remoteAddressStrategy.match(plainAccessResource)) {
            return;
        }
    }
    //判斷用戶名是否為空,null則拋出AclException異常
    if (plainAccessResource.getAccessKey() == null) {
        throw new AclException(String.format("No accessKey is configured"));
    }
    //校驗賬戶是否存在于服務端的權限資源文件中plain_acl.yml,不在則拋出異常
    if (!plainAccessResourceMap.containsKey(plainAccessResource.getAccessKey())) {
        throw new AclException(String.format("No acl config for %s", plainAccessResource.getAccessKey()));
    }
    PlainAccessResource ownedAccess = plainAccessResourceMap.get(plainAccessResource.getAccessKey());
    //檢查該賬戶的白名單IP是否匹配上客戶端IP,匹配成功具有所有權限,除UPDATE_AND_CREATE_TOPIC等特殊權限需要管理員權限
    if (ownedAccess.getRemoteAddressStrategy().match(plainAccessResource)) {
        return;
    }
    //校驗簽名
    String signature = AclUtils.calSignature(plainAccessResource.getContent(), ownedAccess.getSecretKey());
    if (!signature.equals(plainAccessResource.getSignature())) {
        throw new AclException(String.format("Check signature failed for accessKey=%s", plainAccessResource.getAccessKey()));
    }
    //校驗賬戶內的資源權限
    checkPerm(plainAccessResource, ownedAccess);
}

查看其對于當前賬戶內部的資源校驗

void checkPerm(PlainAccessResource needCheckedAccess, PlainAccessResource ownedAccess) {
	//判斷請求的命令的Code是否需要管理員權限,并判斷該用戶是否是管理員
    if (Permission.needAdminPerm(needCheckedAccess.getRequestCode()) && !ownedAccess.isAdmin()) {
        throw new AclException(String.format("Need admin permission for request code=%d, but accessKey=%s is not", needCheckedAccess.getRequestCode(), ownedAccess.getAccessKey()));
    }
    Map<String, Byte> needCheckedPermMap = needCheckedAccess.getResourcePermMap();
    Map<String, Byte> ownedPermMap = ownedAccess.getResourcePermMap();

    if (needCheckedPermMap == null) {
        // If the needCheckedPermMap is null,then return
        return;
    }
    for (Map.Entry<String, Byte> needCheckedEntry : needCheckedPermMap.entrySet()) {
        String resource = needCheckedEntry.getKey();
        Byte neededPerm = needCheckedEntry.getValue();
        //判斷是否是group,在構建resourcePermMap時候,group的key=RETRY_GROUP_TOPIC_PREFIX + consumerGroup
        boolean isGroup = PlainAccessResource.isRetryTopic(resource);
        //系統的權限配置文件中配置項包不含該客戶端命令請求需要的權限
        if (!ownedPermMap.containsKey(resource)) {
            //判斷其是否是topic還是group的權限標識,獲取該類型的全局的權限是什么
            byte ownedPerm = isGroup ? needCheckedAccess.getDefaultGroupPerm() :
                needCheckedAccess.getDefaultTopicPerm();
            //核對權限
            if (!Permission.checkPermission(neededPerm, ownedPerm)) {
                throw new AclException(String.format("No default permission for %s", PlainAccessResource.printStr(resource, isGroup)));
            }
            continue;
        }
        //系統的權限配置文件中配置項包含該客戶端命令請求需要的權限,則直接判斷其權限
        if (!Permission.checkPermission(neededPerm, ownedPermMap.get(resource))) {
            throw new AclException(String.format("No default permission for %s", PlainAccessResource.printStr(resource, isGroup)));
        }
    }
}

所有的檢驗流程如果有一項不滿足則拋出AclException異常

2.5、客戶端發送請求

上面圖中只是分析了Broker服務端的處理流程,客戶端如何調用我們具體分析下我們以發送消息為例:

我們之前分析過Producer的消息發送的核心方法是DefaultMQProducerImpl.sendKernelImpl()該方法

//是否注冊了“鉤子”
if (this.hasSendMessageHook()) {
    context = new SendMessageContext();
    context.setProducer(this);
    context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
    context.setCommunicationMode(communicationMode);
    context.setBornHost(this.defaultMQProducer.getClientIP());
    context.setBrokerAddr(brokerAddr);
    context.setMessage(msg);
    context.setMq(mq);
    String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
    if (isTrans != null && isTrans.equals("true")) {
        context.setMsgType(MessageType.Trans_Msg_Half);
    }

    if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
        context.setMsgType(MessageType.Delay_Msg);
    }
    //封裝其ACL請求的參數信息
    this.executeSendMessageHookBefore(context);
}

hasSendMessageHook(),我們在構建Producer的時候創建了該對象,加入到DefaultMQProducerImpl的sendMessageHookList屬性中。

我們查看其發送消息NettyRemotingClient類中調用AclClientRPCHook.doBeforeRequest()發送前的數據準備

public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
    byte[] total = AclUtils.combineRequestContent(request,
        parseRequestContent(request, sessionCredentials.getAccessKey(), sessionCredentials.getSecurityToken()));
    String signature = AclUtils.calSignature(total, sessionCredentials.getSecretKey());
    request.addExtField(SIGNATURE, signature);
    request.addExtField(ACCESS_KEY, sessionCredentials.getAccessKey());
    
    // The SecurityToken value is unneccessary,user can choose this one.
    if (sessionCredentials.getSecurityToken() != null) {
        request.addExtField(SECURITY_TOKEN, sessionCredentials.getSecurityToken());
    }
}

關于RocketMQ中怎么實現權限控制就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

嵊州市| 宝应县| 界首市| 浙江省| 城口县| 铜川市| 云林县| 龙门县| 济南市| 龙岩市| 阿拉善左旗| 新河县| 昌宁县| 麻城市| 象州县| 金溪县| 涿州市| 南部县| 商丘市| 华安县| 光山县| 新晃| 和龙市| 都江堰市| 牡丹江市| 侯马市| 增城市| 忻城县| 察雅县| 剑川县| 衡东县| 永修县| 柳林县| 东乡族自治县| 和顺县| 历史| 江达县| 修水县| 阳城县| 商河县| 云南省|