redis訂閱內存過大的處理方法:
為監聽類自定義一個線程池即可,代碼如下:
package com.sec.ems.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
/**
* redis監聽發布類
*
*/
@Component("redisPubSub")
public class RedisPubSubImpl implements PublishSubcriber,InitializingBean {
private static final Logger LOGGER = LoggerFactory.getLogger(RedisPubSubImpl.class);
//redis模板
@Autowired
private StringRedisTemplate redisTemplate;
//初始化redis消息監聽容器
private RedisMessageListenerContainer listenerContainer = new RedisMessageListenerContainer();
@Autowired
private ThreadPoolTaskExecutor taskExecutor;
@Override
public void publish(String channel, String msg) {
Assert.notNull(channel, "Invalid channel:The channel is required.");
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Pubilish a message[{}] to channel[{}].",msg,channel);
}
//轉換發布消息
redisTemplate.convertAndSend(channel, msg);
}
@Override
public void subscribe(String channel, MessageListener listener) {
LOGGER.info("subscribe a channel:{}",channel);
listenerContainer.addMessageListener(listener, new ChannelTopic(channel));
}
/**
* 線程池配置
* @return
* 2019年11月13日
*/
@Bean
public ThreadPoolTaskExecutor taskExecutor(){
ThreadPoolTaskExecutor springSessionRedisTaskExecutor = new ThreadPoolTaskExecutor();
//核心線程
springSessionRedisTaskExecutor.setCorePoolSize(50);
//最大線程
springSessionRedisTaskExecutor.setMaxPoolSize(300);
//線程最大空閑時間
springSessionRedisTaskExecutor.setKeepAliveSeconds(10);
//隊列大小
springSessionRedisTaskExecutor.setQueueCapacity(1000);
//線程名稱前綴
springSessionRedisTaskExecutor.setThreadNamePrefix("Spring session redis executor thread: ");
return springSessionRedisTaskExecutor;
}
/**
* 初始化配置
*/
@Override
public void afterPropertiesSet() throws Exception {
//設置監聽容器的redis連接工廠類
listenerContainer.setConnectionFactory(redisTemplate.getConnectionFactory());
//監聽容器的初始化配置 線程池
listenerContainer.setTaskExecutor(taskExecutor);
listenerContainer.afterPropertiesSet();
//啟動監聽容器
listenerContainer.start();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("isActive:{},autoRun:{},running:{}", listenerContainer.isActive(), listenerContainer.isAutoStartup(), listenerContainer.isRunning());
}
}
}