您好,登錄后才能下訂單哦!
本文主要包括以下內容:
通過生產者-消費者模式保證數據鏈路的魯棒性
改進音頻錄制及播放,提高語音通信質量
采用多播實現設備發現及跨路由通信
實現對講進程與UI進程的通信(AIDL)
在《實時Android語音對講系統架構》對語音對講系統的數據鏈路的分析中提到,數據包要經過Record、Encoder、Transmission、Decoder、Play這一鏈條的處理,這種數據流轉就是對講機核心抽象,鑒于這種場景,采用了責任鏈設計模式。
在后續實踐中發現這樣的結構存在一些問題,責任鏈模式適用于數據即時流轉,需要整個鏈路沒有阻塞、等待。而在本應用場景中,編解碼及錄制播放均可能存在時間延遲,責任鏈模式無法兼顧網絡、編解碼的延時。
事實上,通過緩存隊列則可以保證數據鏈路的穩定性,分別在編解碼和數據發送接收時加入阻塞隊列,可以實現數據包的緩沖,同時降低丟包的可能。因此,在本系統場景下,基于阻塞隊列實現了生產者-消費者模式,是對責任鏈模式的優化,意在提高數據鏈路的魯棒性。
本節包括以下內容:
阻塞隊列(數據結構)
阻塞隊列實現生產者-消費者模式
阻塞隊列(數據結構)
阻塞隊列(BlockingQueue)是一個支持兩個附加操作的隊列。這兩個附加的操作是:
在隊列為空時,獲取元素的線程會等待隊列變為非空。
當隊列滿時,存儲元素的線程會等待隊列可用。
阻塞隊列常用于生產者和消費者的場景,生產者是往隊列里添加元素的線程,消費者是從隊列里拿元素的線程。阻塞隊列就是生產者存放元素的容器,而消費者也只從容器里拿元素。
阻塞隊列提供了四種處理方法:
方法\處理方式 | 拋出異常 | 返回特殊值 | 一直阻塞 | 超時退出 | |
---|---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) | |
移除方法 | remove() | poll() | take() | poll(time,unit) | |
檢查方法 | element() | peek() | 不可用 | 不可用 |
拋出異常:是指當阻塞隊列滿時候,再往隊列里插入元素,會拋出IllegalStateException("Queue full")異常。當隊列為空時,從隊列里獲取元素時會拋出NoSuchElementException異常 。
返回特殊值:插入方法會返回是否成功,成功則返回true。移除方法,則是從隊列里拿出一個元素,如果沒有則返回null
一直阻塞:當阻塞隊列滿時,如果生產者線程往隊列里put元素,隊列會一直阻塞生產者線程,直到拿到數據,或者響應中斷退出。當隊列空時,消費者線程試圖從隊列里take元素,隊列也會阻塞消費者線程,直到隊列可用。
超時退出:當阻塞隊列滿時,隊列會阻塞生產者線程一段時間,如果超過一定的時間,生產者線程就會退出。
本文通過LinkedBlockingQueue的put和take方法實現線程阻塞。LinkedBlockingQueue是一個用鏈表實現的有界阻塞隊列。此隊列的默認和最大長度為Integer.MAX_VALUE。此隊列按照先進先出的原則對元素進行排序。
首先看下LinkedBlockingQueue中核心的域:
static class Node<E> { E item; Node<E> next; Node(E x) { item = x; } }private final int capacity;private final AtomicInteger count = new AtomicInteger();transient Node<E> head;private transient Node<E> last;private final ReentrantLock takeLock = new ReentrantLock();private final Condition notEmpty = takeLock.newCondition();private final ReentrantLock putLock = new ReentrantLock();private final Condition notFull = putLock.newCondition();
LinkedBlockingQueue和LinkedList類似,通過靜態內部類Node<E>進行元素的存儲;
capacity表示阻塞隊列所能存儲的最大容量,在創建時可以手動指定最大容量,默認的最大容量為Integer.MAX_VALUE;
count表示當前隊列中的元素數量,LinkedBlockingQueue的入隊列和出隊列使用了兩個不同的lock對象,因此無論是在入隊列還是出隊列,都會涉及對元素數量的并發修改,因此這里使用了一個原子操作類來解決對同一個變量進行并發修改的線程安全問題。
head和last分別表示鏈表的頭部和尾部;
takeLock表示元素出隊列時線程所獲取的鎖,當執行take、poll等操作時線程獲取;notEmpty當隊列為空時,通過該Condition讓獲取元素的線程處于等待狀態;
putLock表示元素入隊列時線程所獲取的鎖,當執行put、offer等操作時獲取;notFull當隊列容量達到capacity時,通過該Condition讓加入元素的線程處于等待狀態。
其次,LinkedBlockingQueue有三個構造方法,分別如下:
public LinkedBlockingQueue() { this(Integer.MAX_VALUE); }public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); }public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; putLock.lock(); // Never contended, but necessary for visibility try { int n = 0; for (E e : c) { if (e == null) throw new NullPointerException(); if (n == capacity) throw new IllegalStateException("Queue full"); enqueue(new Node<E>(e)); ++n; } count.set(n); } finally { putLock.unlock(); } }
默認構造函數直接調用LinkedBlockingQueue(int capacity),LinkedBlockingQueue(int capacity)會初始化首尾節點,并置位null。LinkedBlockingQueue(Collection<? extends E> c)在初始化隊列的同時,將一個集合的全部元素加入隊列。
最后,重點分析下put和take的過程:
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); }
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
之所以把put和take放在一起,是因為它們是一對互逆的過程:
put在插入元素前首先獲得putLock和當前隊列的元素數量,take在去除元素錢首先獲得takeLock和當前隊列的元素數量;
put時需要判斷當前隊列是否已滿,已滿時當前線程進行等待,take時需要判斷隊列是否已空,隊列為空時當前線程進行等待;
put調用enqueue在隊尾插入元素,并修改尾指針,take調用dequeue將head指向原來first的位置,并將first的數據域置位null,實現刪除原first指針,并產生新的head,同時,切斷原head節點的引用,便于垃圾回收。
private void enqueue(Node<E> node) { last = last.next = node; }private E dequeue() { Node<E> h = head; Node<E> first = h.next; h.next = h; // help GChead = first; E x = first.item; first.item = null;return x; }
最后,put根據count決定是否觸發隊列未滿和隊列空;take根據count決定是否觸發隊列未空和隊列滿。
LinkedBlockingQueue在入隊列和出隊列時使用的是不同的Lock,這也意味著它們之間的操作不會存在互斥。在多個CPU的情況下,可以做到在同一時刻既消費、又生產,做到并行處理。
阻塞隊列實現生產者-消費者模式
通過對LinkedBlockingQueue主要源碼的分析,實現生產者-消費者模式就變得簡單了。
public class MessageQueue { private static MessageQueue messageQueue1, messageQueue2, messageQueue3, messageQueue4; private BlockingQueue<AudioData> audioDataQueue = null; private MessageQueue() { audioDataQueue = new LinkedBlockingQueue<>(); } @Retention(SOURCE) @IntDef({ENCODER_DATA_QUEUE, SENDER_DATA_QUEUE, DECODER_DATA_QUEUE, TRACKER_DATA_QUEUE}) public @interface DataQueueType { } public static final int ENCODER_DATA_QUEUE = 0; public static final int SENDER_DATA_QUEUE = 1; public static final int DECODER_DATA_QUEUE = 2; public static final int TRACKER_DATA_QUEUE = 3; public static MessageQueue getInstance(@DataQueueType int type) { switch (type) { case ENCODER_DATA_QUEUE: if (messageQueue1 == null) { messageQueue1 = new MessageQueue(); } return messageQueue1; case SENDER_DATA_QUEUE: if (messageQueue2 == null) { messageQueue2 = new MessageQueue(); } return messageQueue2; case DECODER_DATA_QUEUE: if (messageQueue3 == null) { messageQueue3 = new MessageQueue(); } return messageQueue3; case TRACKER_DATA_QUEUE: if (messageQueue4 == null) { messageQueue4 = new MessageQueue(); } return messageQueue4; default: return new MessageQueue(); } } public void put(AudioData audioData) { try { audioDataQueue.put(audioData); } catch (InterruptedException e) { e.printStackTrace(); } } public AudioData take() { try { return audioDataQueue.take(); } catch (InterruptedException e) { e.printStackTrace(); } return null; } }
這里通過@IntDef來實現限定輸入類型的功能,同時,阻塞隊列保持單實例,然后將隊列分別應用到各個生產者-消費者線程中。在本文的語音對講系統中,以音頻錄制線程和編碼線程為例,錄制線程是音頻數據包的生產者,編碼線程是音頻數據包的消費者。
音頻錄制線程:
@Overridepublic void run() { while (isRecording) { if (audioRecord.getRecordingState() == AudioRecord.RECORDSTATE_STOPPED) { audioRecord.startRecording(); } // 實例化音頻數據緩沖 short[] rawData = new short[inAudioBufferSize]; audioRecord.read(rawData, 0, inAudioBufferSize); AudioData audioData = new AudioData(rawData); MessageQueue.getInstance(MessageQueue.ENCODER_DATA_QUEUE).put(audioData); } }
編碼線程:
@Overridepublic void run() { AudioData data; // 在MessageQueue為空時,take方法阻塞 while ((data = MessageQueue.getInstance(MessageQueue.ENCODER_DATA_QUEUE).take()) != null) { data.setEncodedData(AudioDataUtil.raw2spx(data.getRawData())); MessageQueue.getInstance(MessageQueue.SENDER_DATA_QUEUE).put(data); }}
同樣的,編碼線程和發送線程,接收線程和解碼線程,解碼線程和播放線程同樣存在生產者-消費者的關系。
錄制,改變了音頻輸入源,將直接從麥克風(MIC)獲取改為MediaRecorder.AudioSource.VOICE_COMMUNICATION,VOICE_COMMUNICATION能自動回聲消除和增益,因此,屏蔽了speex在C層的降噪和增益。
播放,改變了音頻輸出端,將STREAM_MUSIC換成STREAM_VOICE_CALL,因為,對講機應用更類似于語音通信。換成STREAM_VOICE_CALL之后,遇到的問題是只能從聽筒聽到聲音,于是設置免提功能。
AudioManager audioManager =(AudioManager) getSystemService(Context.AUDIO_SERVICE); audioManager.setMode(AudioManager.MODE_IN_COMMUNICATION); audioManager.setSpeakerphoneOn(true);
該設置必須要開放修改音頻的權限,不然沒有效果。
<uses-permission android:name="android.permission.MODIFY_AUDIO_SETTINGS"/>
目前的語音通信質量,個人感覺仍然需要繼續優化,如果您有這方面的經驗(包括但不限于Java層和Speex音頻處理),不吝賜教!
《通過UDP廣播實現Android局域網Peer Discovering》中從編程的角度說明了TCP與UDP的區別,主要分析了TCP是面向連接的、可靠的服務,建立連接需要經過三次握手、銷毀連接需要四次揮手;UDP是無連接的傳輸層協議,提供面向事務的簡單不可靠信息傳送服務。
IP地址分為三類:單播、廣播和多播。廣播和多播僅用于UDP,它們用于將報文同時傳送給多個接收者。廣播分為:受限廣播、指向網絡的廣播、指向子網的廣播、指向所有子網的廣播。
舉個栗子:當前IP為10.13.200.16/22,首先廣播地址為255.255.255.255,子網廣播地址為10.13.203.255。
《通過UDP廣播實現Android局域網Peer Discovering》采用子網廣播實現局域網Android設備的發現,但在實踐中,一般路由器會禁止所有廣播跨路由器傳輸。所以,如果子網內有多個路由器,那么就無法實現設備發現了。因此,本文將設備發現也改為多播實現。多播組地址包括為1110的最高4bit和多播組號,范圍為224.0.0.0到239.255.255.255。能夠接收發往一個特定多播組地址數據的主機集合稱為主機組,主機組可以跨越多個網絡。
IANA 把224.0.0.0 到 224.0.0.255 范圍內的地址全部都保留給了路由協議和其他網絡維護功能。該范圍內的地址屬于局部范疇,不論生存時間字段(TTL)值是多少,都不會被路由器轉發;D類保留地址的完整的列表可以參見RFC1700。
224.0.1.0 到 238.255.255.255 地址范圍作為用戶組播地址,在全網范圍內有效。其中233/8 為 GLOP 地址。GLOP 是一種自治系統之間的組播地址分配機制,將 AS 號直接填入組播地址的中間兩個字節中,每個自治系統都可以得到 255 個組播地址;
239.0.0.0 到 239.255.255.255 地址范圍為本地管理組播地址(administratively scoped addresses),僅在特定的本地范圍內有效。
本文對比了子網廣播和多播,子網廣播地址為:192.168.137.255,多播組地址為:224.5.6.7。
發送接收采用同一MulticastSocket,MulticastSocket設置TTL,TTL表示跨網絡的級數。
try { inetAddress = InetAddress.getByName(Constants.MULTI_BROADCAST_IP); multicastSocket = new MulticastSocket(Constants.MULTI_BROADCAST_PORT); multicastSocket.setLoopbackMode(true); multicastSocket.joinGroup(inetAddress); multicastSocket.setTimeToLive(4); } catch (IOException e) { e.printStackTrace(); }
joinGroup涉及到另一個協議:網路群組管理協議(Internet Group Management Protocol或簡寫IGMP),通過抓包可以觀察到初始化MulticastSocket時加入組協議的報文。
setTimeToLive用于設置生存時間字段。默認情況下,多播數據報的TTL設置為1,使得多播數據報僅限于在同一個子網內傳送,更大的TTL值能夠被多播路由器轉發。在實際傳輸過程中,多播組地址仍然需要轉換為以太網地址。實際轉換規則這里不再贅述。
上述多播地址224.5.6.7轉換后為01:00:5e:05:06:07。
代碼層面上,探測線程將子網廣播改為多播實現。
if (command != null) { byte[] data = command.getBytes(); DatagramPacket datagramPacket = new DatagramPacket( data, data.length, Multicast.getMulticast().getInetAddress(), Constants.MULTI_BROADCAST_PORT); try { Multicast.getMulticast().getMulticastSocket().send(datagramPacket); } catch (IOException e) { e.printStackTrace(); } }
并且在接收端區分指令和音頻數據。
while (true) { // 設置接收緩沖段 byte[] receivedData = new byte[512]; DatagramPacket datagramPacket = new DatagramPacket(receivedData, receivedData.length); try { // 接收數據報文 Multicast.getMulticast().getMulticastSocket().receive(datagramPacket); } catch (IOException e) { e.printStackTrace(); } // 判斷數據報文類型,并做相應處理 if (datagramPacket.getLength() == Command.DISC_REQUEST.getBytes().length || datagramPacket.getLength() == Command.DISC_LEAVE.getBytes().length || datagramPacket.getLength() == Command.DISC_RESPONSE.getBytes().length) { handleCommandData(datagramPacket); } else { handleAudioData(datagramPacket); } }
在實際工程應用場景中,需要對講機進程即使切換到后臺,也依然能收到信息。因此,為了提高進程的優先級,降低被系統回收的概率,采用了在Service中訪問網絡服務,處理語音信息的發送和接收的方案。前臺Activity負責顯示組播組內用戶(上線和下線,更新頁面),通過AIDL與Service進行跨進程通信和回調。Service的清單說明如下:
<service android:name=".service.IntercomService" android:process=":intercom" />
:intercom表示定義子進程intercom。
使用多進程相比于常見的單進程,有一些需要注意的點:
靜態成員和單例模式失效。因為每個進程都會分配一個獨立的虛擬機,不同的虛擬機對應不同的地址空間;
線程同步機制失效。因此不同進程鎖的并不是同一個對象;
Application會多次創建。進程與Application對應,多進程會啟動多個Application。
因此,通過process定義了多進程之后,一定要避免單進程模式下對象共享的思路。另外,在AS中調試多進程應用的時候,斷點一定要針對不同的進程,以本文為例,添加斷點需要選擇主進程和intercom進程。給兩個進程分別添加調試斷點后,可以看到有兩個Debugger:3156和3230(由于存在Jni代碼,所以顯示了Hybrid Debugger)。
由于既存在Activity到Service的通信,也存在Service接收到消息之后更新Activity頁面的需求,所以這里采用了跨進程回調的方式。首先,AIDL方法如下:
package com.jd.wly.intercom.service;import com.jd.wly.intercom.service.IIntercomCallback;interface IIntercomService { void startRecord(); void stopRecord(); void registerCallback(IIntercomCallback callback); void unRegisterCallback(IIntercomCallback callback); }
package com.jd.wly.intercom.service;interface IIntercomCallback { void findNewUser(String ipAddress); void removeUser(String ipAddress); }
IIntercomService定義了Activity到Service的通信方法,包含啟動和停止音頻錄制,以及注冊和解除回調接口;IIntercomCallback定義了從Service到Activity的回調接口,用于在Service發現用戶上線、下線時通知前臺Activity的顯示。
AIDL文件的定義涉及一些規范:比如變量在同一包內也需要import,非基本數據類型參數列表需要指明in、out,自定義參數類型需要同時編寫java文件和aidl文件等,本文篇幅有限,就不具體展開AIDL跨進程通信的細節了。
Activity檢測用戶的按鍵操作,然后將事件傳遞給Service進行對應的邏輯處理。
將Service綁定到Activity首先需要定義ServiceConnection:
/** * onServiceConnected和onServiceDisconnected運行在UI線程中 */private IIntercomService intercomService;private ServiceConnection serviceConnection = new ServiceConnection() { @Override public void onServiceConnected(ComponentName name, IBinder service) { intercomService = IIntercomService.Stub.asInterface(service); try { intercomService.registerCallback(intercomCallback); } catch (RemoteException e) { e.printStackTrace(); } } @Override public void onServiceDisconnected(ComponentName name) { intercomService = null; } };
在onStart()時綁定Service,onStop()時解除回調和綁定。
@Overrideprotected void onStart() { super.onStart(); Intent intent = new Intent(AudioActivity.this, IntercomService.class); bindService(intent, serviceConnection, BIND_AUTO_CREATE); }
@Overrideprotected void onStop() { super.onStop(); if (intercomService != null && intercomService.asBinder().isBinderAlive()) { try { intercomService.unRegisterCallback(intercomCallback); } catch (RemoteException e) { e.printStackTrace(); } unbindService(serviceConnection); } }
Activity獲取了Service的服務后,分別在按鍵事件處理中進行調用。
@Overridepublic boolean onKeyDown(int keyCode, KeyEvent event) { if ((keyCode == KeyEvent.KEYCODE_F2 || keyCode == KeyEvent.KEYCODE_VOLUME_UP || keyCode == KeyEvent.KEYCODE_VOLUME_DOWN)) { try { intercomService.startRecord(); } catch (RemoteException e) { e.printStackTrace(); } return true; } return super.onKeyDown(keyCode, event); }@Overridepublic boolean onKeyUp(int keyCode, KeyEvent event) { if ((keyCode == KeyEvent.KEYCODE_F2 || keyCode == KeyEvent.KEYCODE_VOLUME_UP || keyCode == KeyEvent.KEYCODE_VOLUME_DOWN)) { try { intercomService.stopRecord(); } catch (RemoteException e) { e.printStackTrace(); } return true; } return super.onKeyUp(keyCode, event); }
startRecord和stopRecord的具體實現定義在Service中:
public IIntercomService.Stub mBinder = new IIntercomService.Stub() { @Override public void startRecord() throws RemoteException { if (!recorder.isRecording()) { recorder.setRecording(true); tracker.setPlaying(false); threadPool.execute(recorder); } } @Override public void stopRecord() throws RemoteException { if (recorder.isRecording()) { recorder.setRecording(false); tracker.setPlaying(true); } } @Override public void registerCallback(IIntercomCallback callback) throws RemoteException { mCallbackList.register(callback); } @Override public void unRegisterCallback(IIntercomCallback callback) throws RemoteException { mCallbackList.unregister(callback); } };
Service通過RemoteCallbackList保持回調方法,使用時首先定義RemoteCallbackList對象,泛型類型為IIntercomCallback。
private RemoteCallbackList<IIntercomCallback> mCallbackList = new RemoteCallbackList<>();
RemoteCallbackList并不是List,內部通過Map來保存,Key和Value分別為IBinder和Callback。
ArrayMap<IBinder, Callback> mCallbacks = new ArrayMap<IBinder, Callback>();
使用RemoteCallbackList回調Activity方法時,通過beginBroadcast獲取數量,
/** * 發現新的組播成員 * * @param ipAddress IP地址 */private void findNewUser(String ipAddress) { final int size = mCallbackList.beginBroadcast(); for (int i = 0; i < size; i++) { IIntercomCallback callback = mCallbackList.getBroadcastItem(i); if (callback != null) { try { callback.findNewUser(ipAddress); } catch (RemoteException e) { e.printStackTrace(); } } } mCallbackList.finishBroadcast(); }
removeUser(String ipAddress)方法與findNewUser(String ipAddress)方法類似。它們具體的實現在Activity中:
/** * 被調用的方法運行在Binder線程池中,不能更新UI */private IIntercomCallback intercomCallback = new IIntercomCallback.Stub() { @Override public void findNewUser(String ipAddress) throws RemoteException { sendMsg2MainThread(ipAddress, FOUND_NEW_USER); } @Override public void removeUser(String ipAddress) throws RemoteException { sendMsg2MainThread(ipAddress, REMOVE_USER); } };
需要注意的是,IIntercomCallback中的回調方法實現并不在UI線程中執行,如果需要更新UI,需要實現多線程調用,多線程依然通過Handler來實現
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。