您好,登錄后才能下訂單哦!
本文小編為大家詳細介紹“使用Redisson訂閱數問題怎么解決”,內容詳細,步驟清晰,細節處理妥當,希望這篇“使用Redisson訂閱數問題怎么解決”文章能幫助大家解決疑惑,下面跟著小編的思路慢慢深入,一起來學習新知識吧。
最近在使用分布式鎖redisson時遇到一個線上問題:發現是subscriptionsPerConnection or subscriptionConnectionPoolSize
的大小不夠,需要提高配置才能解決。
下面對其源碼進行分析,才能找到到底是什么邏輯導致問題所在:
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { long threadId = Thread.currentThread().getId(); // 嘗試獲取,如果ttl == null,則表示獲取鎖成功 Long ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { return; } // 訂閱鎖釋放事件,并通過await方法阻塞等待鎖釋放,有效的解決了無效的鎖申請浪費資源的問題 RFuture<RedissonLockEntry> future = subscribe(threadId); if (interruptibly) { commandExecutor.syncSubscriptionInterrupted(future); } else { commandExecutor.syncSubscription(future); } // 后面代碼忽略 try { // 無限循環獲取鎖,直到獲取鎖成功 // ... } finally { // 取消訂閱鎖釋放事件 unsubscribe(future, threadId); } }
總結下主要邏輯:
獲取當前線程的線程id;
tryAquire嘗試獲取鎖,并返回ttl
如果ttl為空,則結束流程;否則進入后續邏輯;
this.subscribe(threadId)訂閱當前線程,返回一個RFuture;
如果在指定時間沒有監聽到,則會產生如上異常。
訂閱成功后, 通過while(true)循環,一直嘗試獲取鎖
fially代碼塊,會解除訂閱
所以上述這情況問題應該出現在subscribe()方法中
protected RFuture<RedissonLockEntry> subscribe(long threadId) { // entryName 格式:“id:name”; // channelName 格式:“redisson_lock__channel:name”; return pubSub.subscribe(getEntryName(), getChannelName()); }
RedissonLock#pubSub 是在RedissonLock構造函數中初始化的:
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) { // .... this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub(); }
而subscribeService在MasterSlaveConnectionManager的實現中又是通過如下方式構造的
public MasterSlaveConnectionManager(MasterSlaveServersConfig cfg, Config config, UUID id) { this(config, id); this.config = cfg; // 初始化 initTimer(cfg); initSingleEntry(); } protected void initTimer(MasterSlaveServersConfig config) { int[] timeouts = new int[]{config.getRetryInterval(), config.getTimeout()}; Arrays.sort(timeouts); int minTimeout = timeouts[0]; if (minTimeout % 100 != 0) { minTimeout = (minTimeout % 100) / 2; } else if (minTimeout == 100) { minTimeout = 50; } else { minTimeout = 100; } timer = new HashedWheelTimer(new DefaultThreadFactory("redisson-timer"), minTimeout, TimeUnit.MILLISECONDS, 1024, false); connectionWatcher = new IdleConnectionWatcher(this, config); // 初始化:其中this就是MasterSlaveConnectionManager實例,config則為MasterSlaveServersConfig實例: subscribeService = new PublishSubscribeService(this, config); }
PublishSubscribeService構造函數
private final SemaphorePubSub semaphorePubSub = new SemaphorePubSub(this); public PublishSubscribeService(ConnectionManager connectionManager, MasterSlaveServersConfig config) { super(); this.connectionManager = connectionManager; this.config = config; for (int i = 0; i < locks.length; i++) { // 這里初始化了一組信號量,每個信號量的初始值為1 locks[i] = new AsyncSemaphore(1); } }
private final ConcurrentMap<String, E> entries = new ConcurrentHashMap<>(); public RFuture<E> subscribe(String entryName, String channelName) { // 從PublishSubscribeService獲取對應的信號量。 相同的channelName獲取的是同一個信號量 // public AsyncSemaphore getSemaphore(ChannelName channelName) { // return locks[Math.abs(channelName.hashCode() % locks.length)]; // } AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName)); AtomicReference<Runnable> listenerHolder = new AtomicReference<Runnable>(); RPromise<E> newPromise = new RedissonPromise<E>() { @Override public boolean cancel(boolean mayInterruptIfRunning) { return semaphore.remove(listenerHolder.get()); } }; Runnable listener = new Runnable() { @Override public void run() { // 如果存在RedissonLockEntry, 則直接利用已有的監聽 E entry = entries.get(entryName); if (entry != null) { entry.acquire(); semaphore.release(); entry.getPromise().onComplete(new TransferListener<E>(newPromise)); return; } E value = createEntry(newPromise); value.acquire(); E oldValue = entries.putIfAbsent(entryName, value); if (oldValue != null) { oldValue.acquire(); semaphore.release(); oldValue.getPromise().onComplete(new TransferListener<E>(newPromise)); return; } // 創建監聽, RedisPubSubListener<Object> listener = createListener(channelName, value); // 訂閱監聽 service.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener); } }; // 最終會執行listener.run方法 semaphore.acquire(listener); listenerHolder.set(listener); return newPromise; }
AsyncSemaphore#acquire()方法
public void acquire(Runnable listener) { acquire(listener, 1); } public void acquire(Runnable listener, int permits) { boolean run = false; synchronized (this) { // counter初始化值為1 if (counter < permits) { // 如果不是第一次執行,則將listener加入到listeners集合中 listeners.add(new Entry(listener, permits)); return; } else { counter -= permits; run = true; } } // 第一次執行acquire, 才會執行listener.run()方法 if (run) { listener.run(); } }
梳理上述邏輯:
1、從PublishSubscribeService獲取對應的信號量, 相同的channelName獲取的是同一個信號量
2、如果是第一次請求,則會立馬執行listener.run()方法, 否則需要等上個線程獲取到該信號量執行完方能執行;
3、如果已經存在RedissonLockEntry, 則利用已經訂閱就行
4、如果不存在RedissonLockEntry, 則會創建新的RedissonLockEntry,然后進行。
從上面代碼看,主要邏輯是交給了PublishSubscribeService#subscribe方法
private final ConcurrentMap<ChannelName, PubSubConnectionEntry> name2PubSubConnection = new ConcurrentHashMap<>(); private final Queue<PubSubConnectionEntry> freePubSubConnections = new ConcurrentLinkedQueue<>(); public RFuture<PubSubConnectionEntry> subscribe(Codec codec, String channelName, AsyncSemaphore semaphore, RedisPubSubListener<?>... listeners) { RPromise<PubSubConnectionEntry> promise = new RedissonPromise<PubSubConnectionEntry>(); // 主要邏輯入口, 這里要主要channelName每次都是新對象, 但內部覆寫hashCode+equals。 subscribe(codec, new ChannelName(channelName), promise, PubSubType.SUBSCRIBE, semaphore, listeners); return promise; } private void subscribe(Codec codec, ChannelName channelName, RPromise<PubSubConnectionEntry> promise, PubSubType type, AsyncSemaphore lock, RedisPubSubListener<?>... listeners) { PubSubConnectionEntry connEntry = name2PubSubConnection.get(channelName); if (connEntry != null) { // 從已有Connection中取,如果存在直接把listeners加入到PubSubConnectionEntry中 addListeners(channelName, promise, type, lock, connEntry, listeners); return; } // 沒有時,才是最重要的邏輯 freePubSubLock.acquire(new Runnable() { @Override public void run() { if (promise.isDone()) { lock.release(); freePubSubLock.release(); return; } // 從隊列中取頭部元素 PubSubConnectionEntry freeEntry = freePubSubConnections.peek(); if (freeEntry == null) { // 第一次肯定是沒有的需要建立 connect(codec, channelName, promise, type, lock, listeners); return; } // 如果存在則嘗試獲取,如果remainFreeAmount小于0則拋出異常終止了。 int remainFreeAmount = freeEntry.tryAcquire(); if (remainFreeAmount == -1) { throw new IllegalStateException(); } PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, freeEntry); if (oldEntry != null) { freeEntry.release(); freePubSubLock.release(); addListeners(channelName, promise, type, lock, oldEntry, listeners); return; } // 如果remainFreeAmount=0, 則從隊列中移除 if (remainFreeAmount == 0) { freePubSubConnections.poll(); } freePubSubLock.release(); // 增加監聽 RFuture<Void> subscribeFuture = addListeners(channelName, promise, type, lock, freeEntry, listeners); ChannelFuture future; if (PubSubType.PSUBSCRIBE == type) { future = freeEntry.psubscribe(codec, channelName); } else { future = freeEntry.subscribe(codec, channelName); } future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { if (!promise.isDone()) { subscribeFuture.cancel(false); } return; } connectionManager.newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { subscribeFuture.cancel(false); } }, config.getTimeout(), TimeUnit.MILLISECONDS); } }); } }); } private void connect(Codec codec, ChannelName channelName, RPromise<PubSubConnectionEntry> promise, PubSubType type, AsyncSemaphore lock, RedisPubSubListener<?>... listeners) { // 根據channelName計算出slot獲取PubSubConnection int slot = connectionManager.calcSlot(channelName.getName()); RFuture<RedisPubSubConnection> connFuture = nextPubSubConnection(slot); promise.onComplete((res, e) -> { if (e != null) { ((RPromise<RedisPubSubConnection>) connFuture).tryFailure(e); } }); connFuture.onComplete((conn, e) -> { if (e != null) { freePubSubLock.release(); lock.release(); promise.tryFailure(e); return; } // 這里會從配置中讀取subscriptionsPerConnection PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection()); // 每獲取一次,subscriptionsPerConnection就會減直到為0 int remainFreeAmount = entry.tryAcquire(); // 如果舊的存在,則將現有的entry釋放,然后將listeners加入到oldEntry中 PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry); if (oldEntry != null) { releaseSubscribeConnection(slot, entry); freePubSubLock.release(); addListeners(channelName, promise, type, lock, oldEntry, listeners); return; } if (remainFreeAmount > 0) { // 加入到隊列中 freePubSubConnections.add(entry); } freePubSubLock.release(); RFuture<Void> subscribeFuture = addListeners(channelName, promise, type, lock, entry, listeners); // 這里真正的進行訂閱(底層與redis交互) ChannelFuture future; if (PubSubType.PSUBSCRIBE == type) { future = entry.psubscribe(codec, channelName); } else { future = entry.subscribe(codec, channelName); } future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { if (!promise.isDone()) { subscribeFuture.cancel(false); } return; } connectionManager.newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { subscribeFuture.cancel(false); } }, config.getTimeout(), TimeUnit.MILLISECONDS); } }); }); }
PubSubConnectionEntry#tryAcquire方法, subscriptionsPerConnection代表了每個連接的最大訂閱數。當tryAcqcurie的時候會減少這個數量:
public int tryAcquire() { while (true) { int value = subscribedChannelsAmount.get(); if (value == 0) { return -1; } if (subscribedChannelsAmount.compareAndSet(value, value - 1)) { return value - 1; } } }
梳理上述邏輯:
1、還是進行重復判斷, 根據channelName從name2PubSubConnection中獲取,看是否存在已經訂閱:PubSubConnectionEntry; 如果存在直接把新的listener加入到PubSubConnectionEntry。
2、從隊列freePubSubConnections中取公用的PubSubConnectionEntry, 如果沒有就進入connect()方法
2.1 會根據subscriptionsPerConnection創建PubSubConnectionEntry, 然后調用其tryAcquire()方法 - 每調用一次就會減1
2.2 將新的PubSubConnectionEntry放入全局的name2PubSubConnection, 方便后續重復使用;
2.3 同時也將PubSubConnectionEntry放入隊列freePubSubConnections中。- remainFreeAmount > 0
2.4 后面就是進行底層的subscribe和addListener
3、如果已經存在PubSubConnectionEntry,則利用已有的PubSubConnectionEntry進行tryAcquire;
4、如果remainFreeAmount < 0 會拋出IllegalStateException異常;如果remainFreeAmount=0,則會將其從隊列中移除, 那么后續請求會重新獲取一個可用的連接
5、最后也是進行底層的subscribe和addListener;
讀到這里,這篇“使用Redisson訂閱數問題怎么解決”文章已經介紹完畢,想要掌握這篇文章的知識點還需要大家自己動手實踐使用過才能領會,如果想了解更多相關內容的文章,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。