中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

RabbitMQ用多路由,多隊列來破除流控

發布時間:2021-06-22 14:34:35 來源:億速云 閱讀:184 作者:chen 欄目:大數據

本篇內容主要講解“RabbitMQ用多路由,多隊列來破除流控”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“RabbitMQ用多路由,多隊列來破除流控”吧!

流控機制是我們在使用RabbitMQ最頭疼的問題,一旦并發激增時,消費者消費隊列消息就像滴水一樣慢。

現在我們下單后,需要給通知中心發送消息,讓通知中心通知服務商收取訂單,并確認提供服務。

我們先給Order接口添加一個發送消息的方法。

public interface Order {public void makeOrder(Order order);    public OrderSuccessResult getResult(Order order);    public void postOrder(Order order);}

實現類實現該方法

@Data@AllArgsConstructor@NoArgsConstructor@ServiceOrderVersion(value = 1)@RequiredArgsConstructorpublic class ServiceOrder extends AbstractOrder {private Long id;    @NonNull    private String code;    @NonNull    private Store store;    @NonNull    private ProviderService service;    @NonNull    private Car car;    @NonNull    private Date serviceDate;    @NonNull    private String contact;    @NonNull    private String contactTel;    private AppUser user;    @NonNull    private String content;    private int status;    private Date createDate;    @Override    public void makeOrder(Order order) {
        ServiceOrderDao serviceOrderDao = SpringBootUtil.getBean(ServiceOrderDao.class);        IdService idService = SpringBootUtil.getBean(IdService.class);        ((ServiceOrder)order).setId(idService.genId());        ((ServiceOrder)order).setCode(getCodeInfo(idService));        AppUser loginAppUser = AppUserUtil.getLoginAppUser();        AppUser user = new AppUser();        user.setId(loginAppUser.getId());        user.setUsername(loginAppUser.getUsername());        ((ServiceOrder)order).setUser(user);        ((ServiceOrder)order).setStatus(1);        ((ServiceOrder)order).setCreateDate(new Date());        serviceOrderDao.save((ServiceOrder) order);    }@Override    public OrderSuccessResult getResult(Order order) {
        ServiceOrderSuccessResultFactory orderSuccessResultFactory = SpringBootUtil.getBean(ServiceOrderSuccessResultFactory.class);        this.orderSuccessResult = orderSuccessResultFactory.getOrderSuccessResult();        return this.orderSuccessResult.getResult(order);    }@Override    public void postOrder(Order order) {
        MessageSender sender = SpringBootUtil.getBean(MessageSender.class);        CompletableFuture.runAsync(() ->sender.send(OwnerCarCenterMq.MQ_EXCHANGE_ORDER,                        OwnerCarCenterMq.ROUTING_KEY_ORDER,                        order)
        );    }private String getCodeInfo(IdService idService) {
        String flow = String.valueOf(idService.genId());        flow = flow.substring(14,flow.length());        String pre = DateUtils.format(new Date(), DateUtils.pattern9);        return pre + flow;    }
}

其中我們定義了這么一組隊列名,交換機,和路由

public interface OwnerCarCenterMq {/**     * 隊列名     */    String ORDER_QUEUE = "order";    /**     * 服務系統exchange名     */    String MQ_EXCHANGE_ORDER = "order.topic.exchange";    /**     * 服務添加routing key     */    String ROUTING_KEY_ORDER = "post.order";}

為了避免流控,我們定義了10個隊列,并全部綁定到一個交換機上。

@Configurationpublic class RabbitmqConfig {   @Bean   public List<Queue> orderQueues() {
      List<Queue> queues = new ArrayList<>();      for (int i = 1;i < 11;i++) {
         Queue queue = new Queue(OwnerCarCenterMq.ORDER_QUEUE + "_" + i);         queues.add(queue);      }      return queues;   }   @Bean   public TopicExchange orderExchange() {      return new TopicExchange(OwnerCarCenterMq.MQ_EXCHANGE_ORDER);   }   @Bean   public List<Binding> bindingOrders() {
      List<Binding> bindings = new ArrayList<>();      for (int i = 1;i < 11;i++) {
         Binding binding = BindingBuilder.bind(orderQueues().get(i - 1)).to(orderExchange())
               .with(OwnerCarCenterMq.ROUTING_KEY_ORDER + "_" + i);         bindings.add(binding);      }      return bindings;   }
}

重新封裝消息提供者,每次發送都隨機選取一個路由來進行發送。

@Slf4j@Componentpublic class MessageSender implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {@Autowired    private RabbitTemplate rabbitTemplate;    public void send(String exchange,String routingKey,Object content) {log.info("send content=" + content);        this.rabbitTemplate.setMandatory(true);        this.rabbitTemplate.setConfirmCallback(this);        this.rabbitTemplate.setReturnCallback(this);        ThreadLocalRandom random = ThreadLocalRandom.current();        this.rabbitTemplate.convertAndSend(exchange,routingKey + "_" + random.nextInt(1,11),serialize(content));    }/**     * 確認后回調:     * @param correlationData     * @param ack     * @param cause     */    @Override    public void confirm(CorrelationData correlationData, boolean ack, String cause) {if (!ack) {log.info("send ack fail, cause = " + cause);        } else {log.info("send ack success");        }
    }/**     * 失敗后return回調:     *     * @param message     * @param replyCode     * @param replyText     * @param exchange     * @param routingKey     */    @Override    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.info("send fail return-message = " + new String(message.getBody()) + ", replyCode: " + replyCode + ", replyText: " + replyText + ", exchange: " + exchange + ", routingKey: " + routingKey);    }/**     * 對消息對象進行二進制序列化     * @param o     * @return     */    private byte[] serialize(Object o) {
        Kryo kryo = new Kryo();        ByteArrayOutputStream stream = new ByteArrayOutputStream();        Output output = new Output(stream);        kryo.writeObject(output, o);        output.close();        return stream.toByteArray();    }
}

我們可以看到在ServiceOrder里,我們是通過異步來進行發送到。

Controller如下

@Slf4j@RestControllerpublic class OrderController {private ThreadLocal<OrderFactory> orderFactory = new ThreadLocal<>();    private ThreadLocal<Order> orderService = new ThreadLocal<>();    @Autowired    private OrderBean orderBean;    @Transactional    @SuppressWarnings("unchecked")@PostMapping("/makeeorder")public Result<OrderSuccessResult> makeOrder(@RequestBody String orderStr, @RequestParam("type") String type) {log.info(orderStr);        Order order = setOrderFactory(orderStr,type);        orderService.get().makeOrder(order);        orderService.get().postOrder(order);        return Result.success(orderService.get().getResult(order));    }/**     * 判斷是哪一種類型的訂單來獲取哪一種類型的具體訂單工廠     * @param orderStr     * @return     */    private Order setOrderFactory(String orderStr,String type) {
        Class<?> classType = orderBean.getOrderMap().get(type);        Object order = JSONObject.parseObject(orderStr, classType);//        if (orderStr.contains("service")) {//            order = JSON.parseObject(orderStr, ServiceOrder.class);//        }else if (orderStr.contains("product")) {//            order = JSON.parseObject(orderStr, ProductOrder.class);//        }        Class<?> classFactoryType = orderBean.getOrderFactoryMap().get(type + "Factory");        this.orderFactory.set((OrderFactory) SpringBootUtil.getBean(classFactoryType));//        if (order instanceof ServiceOrder) {//            this.orderFactory.set(SpringBootUtil.getBean(ServiceOrderFactory.class));//        }else if (order instanceof ProductOrder) {//            this.orderFactory.set(SpringBootUtil.getBean(ProductOrderFactory.class));//        }        orderService.set(orderFactory.get().getOrder());        return (Order) order;    }
}

最后是在我們的通知中心模塊接收消息,同時對這10個隊列實行監控

@Slf4j@Component@RabbitListener(queues = {OwnerCarCenterMq.ORDER_QUEUE + "_" + 1,        OwnerCarCenterMq.ORDER_QUEUE + "_" + 2,        OwnerCarCenterMq.ORDER_QUEUE + "_" + 3,        OwnerCarCenterMq.ORDER_QUEUE + "_" + 4,        OwnerCarCenterMq.ORDER_QUEUE + "_" + 5,        OwnerCarCenterMq.ORDER_QUEUE + "_" + 6,        OwnerCarCenterMq.ORDER_QUEUE + "_" + 7,        OwnerCarCenterMq.ORDER_QUEUE + "_" + 8,        OwnerCarCenterMq.ORDER_QUEUE + "_" + 9,        OwnerCarCenterMq.ORDER_QUEUE + "_" + 10})public class ServiceOrderConsummer {@Getter    private Queue<ServiceOrder> serviceOrders = new ConcurrentLinkedDeque<>();    @RabbitHandler    public void receiceOrder(byte[] data, Channel channel, Message message) throws IOException {try {//告訴服務器收到這條消息 已經被我消費了 可以在隊列刪掉;否則消息服務器以為這條消息沒處理掉 后續還會在發            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);            ServiceOrder order = unSerialize(data);            this.serviceOrders.add(order);            log.info(String.valueOf(order));        } catch (IOException e) {
            e.printStackTrace();            //丟棄這條消息            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);            log.info("receiver fail");        }
    }/**     * 反序列化     * @param data     * @return     */    private ServiceOrder unSerialize(byte[] data) {
        Input input = null;        try {
            Kryo kryo = new Kryo();            input = new Input(new ByteArrayInputStream(data));            return kryo.readObject(input,ServiceOrder.class);        }finally {
            input.close();        }
    }
}

項目啟動后,我們可以看到rabbitmq的情況如下

RabbitMQ用多路由,多隊列來破除流控

RabbitMQ用多路由,多隊列來破除流控

現我們來對其進行壓測,啟動Jmeter,我們使用1000線程來進行壓測測試。各配置如下

RabbitMQ用多路由,多隊列來破除流控

RabbitMQ用多路由,多隊列來破除流控

RabbitMQ用多路由,多隊列來破除流控

保存文件上傳服務器,因為本人是華為云的服務器,故在服務器上進行壓測,不進行遠程壓測

在服務器的jmeter的bin目錄下輸入

./jmeter -n -t model/rabbit.jmx -l log.jtl

這里-n為不啟動圖形界面,-t使用我們上傳的配置文件,-l記錄日志

壓測結果如下

RabbitMQ用多路由,多隊列來破除流控

我們在壓測過程中來看一下rabbitmq的UI界面

RabbitMQ用多路由,多隊列來破除流控

RabbitMQ用多路由,多隊列來破除流控

消費基本上是實時的,沒有出現流控積壓現象。

到此,相信大家對“RabbitMQ用多路由,多隊列來破除流控”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

天等县| 新源县| 额尔古纳市| 开江县| 色达县| 松江区| 曲松县| 绥宁县| 武陟县| 泽库县| 崇明县| 宁陵县| 兴义市| 财经| 宁武县| 海宁市| 拉萨市| 汾西县| 苏州市| 崇仁县| 江山市| 余江县| 和龙市| 敦化市| 红桥区| 盐源县| 五原县| 靖边县| 赣州市| 昌平区| 龙山县| 阳城县| 博兴县| 乳源| 明溪县| 清镇市| 托克逊县| 余干县| 丹江口市| 武安市| 鄂伦春自治旗|