中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

RxJava+Retrofit+OkHttp如何實現多文件下載之斷點續傳

發布時間:2021-08-06 11:12:59 來源:億速云 閱讀:165 作者:小新 欄目:移動開發

這篇文章給大家分享的是有關RxJava+Retrofit+OkHttp如何實現多文件下載之斷點續傳的內容。小編覺得挺實用的,因此分享給大家做個參考,一起跟隨小編過來看看吧。

效果

RxJava+Retrofit+OkHttp如何實現多文件下載之斷點續傳

實現

下載和之前的http請求可以相互獨立,所以我們單獨給download建立一個工程moudel處理

1.創建service接口

和以前一樣,先寫接口

注意:Streaming是判斷是否寫入內存的標示,如果小文件可以考慮不寫,一般情況必須寫;下載地址需要通過@url動態指定(不適固定的),@head標簽是指定下載的起始位置(斷點續傳的位置)

 /*斷點續傳下載接口*/
  @Streaming/*大文件需要加入這個判斷,防止下載過程中寫入到內存中*/
  @GET
  Observable<ResponseBody> download(@Header("RANGE") String start, @Url String url);

2.復寫ResponseBody

和之前的上傳封裝一樣,下載更加的需要進度,所以我們同樣覆蓋ResponseBody類,寫入進度監聽回調

/**
 * 自定義進度的body
 * @author wzg
 */
public class DownloadResponseBody extends ResponseBody {
  private ResponseBody responseBody;
  private DownloadProgressListener progressListener;
  private BufferedSource bufferedSource;

  public DownloadResponseBody(ResponseBody responseBody, DownloadProgressListener progressListener) {
    this.responseBody = responseBody;
    this.progressListener = progressListener;
  }

  @Override
  public BufferedSource source() {
    if (bufferedSource == null) {
      bufferedSource = Okio.buffer(source(responseBody.source()));
    }
    return bufferedSource;
  }

  private Source source(Source source) {
    return new ForwardingSource(source) {
      long totalBytesRead = 0L;
      @Override
      public long read(Buffer sink, long byteCount) throws IOException {
        long bytesRead = super.read(sink, byteCount);
        // read() returns the number of bytes read, or -1 if this source is exhausted.
        totalBytesRead += bytesRead != -1 ? bytesRead : 0;
        if (null != progressListener) {
          progressListener.update(totalBytesRead, responseBody.contentLength(), bytesRead == -1);
        }
        return bytesRead;
      }
    };
  }
}

3.自定義進度回調接口

/**
 * 成功回調處理
 * Created by WZG on 2016/10/20.
 */
public interface DownloadProgressListener {
  /**
   * 下載進度
   * @param read
   * @param count
   * @param done
   */
  void update(long read, long count, boolean done);
}

4.復寫Interceptor

復寫Interceptor,可以將我們的監聽回調通過okhttp的client方法addInterceptor自動加載我們的監聽回調和ResponseBody

/**
 * 成功回調處理
 * Created by WZG on 2016/10/20.
 */
public class DownloadInterceptor implements Interceptor {

  private DownloadProgressListener listener;

  public DownloadInterceptor(DownloadProgressListener listener) {
    this.listener = listener;
  }

  @Override
  public Response intercept(Chain chain) throws IOException {
    Response originalResponse = chain.proceed(chain.request());

    return originalResponse.newBuilder()
        .body(new DownloadResponseBody(originalResponse.body(), listener))
        .build();
  }
}

5.封裝請求downinfo數據

這個類中的數據可自由擴展,用戶自己選擇需要保持到數據庫中的數據,可以自由選擇需要數據庫第三方框架,demo采用greenDao框架存儲數據

public class DownInfo {
  /*存儲位置*/
  private String savePath;
  /*下載url*/
  private String url;
  /*基礎url*/
  private String baseUrl;
  /*文件總長度*/
  private long countLength;
  /*下載長度*/
  private long readLength;
  /*下載唯一的HttpService*/
  private HttpService service;
  /*回調監聽*/
  private HttpProgressOnNextListener listener;
  /*超時設置*/
  private int DEFAULT_TIMEOUT = 6;
  /*下載狀態*/
  private DownState state;
  }

6.DownState狀態封裝

很簡單,和大多數封裝框架一樣

public enum DownState {
  START,
  DOWN,
  PAUSE,
  STOP,
  ERROR,
  FINISH,
}

7.請求HttpProgressOnNextListener回調封裝類

注意:這里和DownloadProgressListener不同,這里是下載這個過程中的監聽回調,DownloadProgressListener只是進度的監聽

通過抽象類,可以自由選擇需要覆蓋的類,不需要完全覆蓋!更加靈活

/**
 * 下載過程中的回調處理
 * Created by WZG on 2016/10/20.
 */
public abstract class HttpProgressOnNextListener<T> {
  /**
   * 成功后回調方法
   * @param t
   */
  public abstract void onNext(T t);

  /**
   * 開始下載
   */
  public abstract void onStart();

  /**
   * 完成下載
   */
  public abstract void onComplete();


  /**
   * 下載進度
   * @param readLength
   * @param countLength
   */
  public abstract void updateProgress(long readLength, long countLength);

  /**
   * 失敗或者錯誤方法
   * 主動調用,更加靈活
   * @param e
   */
   public void onError(Throwable e){

   }

  /**
   * 暫停下載
   */
  public void onPuase(){

  }

  /**
   * 停止下載銷毀
   */
  public void onStop(){

  }
}

8.封裝回調Subscriber

準備的工作做完,需要將回調和傳入回調的信息統一封裝到sub中,統一判斷;和封裝二的原理一樣,我們通過自定義Subscriber來提前處理返回的數據,讓用戶字需要關系成功和失敗以及向關心的數據,避免重復多余的代碼出現在處理類中

  1. sub需要繼承DownloadProgressListener,和自帶的回調一起組成我們需要的回調結果

  2. 傳入DownInfo數據,通過回調設置DownInfo的不同狀態,保存狀態

  3. 通過RxAndroid將進度回調指定到主線程中(如果不需要進度最好去掉該處理避免主線程處理負擔)

  4. update進度回調在斷點續傳使用時,需要手動判斷斷點后加載的長度,因為指定斷點下載長度下載后總長度=(物理長度-起始下載長度)

/**
 * 用于在Http請求開始時,自動顯示一個ProgressDialog
 * 在Http請求結束是,關閉ProgressDialog
 * 調用者自己對請求數據進行處理
 * Created by WZG on 2016/7/16.
 */
public class ProgressDownSubscriber<T> extends Subscriber<T> implements DownloadProgressListener {
  //弱引用結果回調
  private WeakReference<HttpProgressOnNextListener> mSubscriberOnNextListener;
  /*下載數據*/
  private DownInfo downInfo;


  public ProgressDownSubscriber(DownInfo downInfo) {
    this.mSubscriberOnNextListener = new WeakReference<>(downInfo.getListener());
    this.downInfo=downInfo;
  }

  /**
   * 訂閱開始時調用
   * 顯示ProgressDialog
   */
  @Override
  public void onStart() {
    if(mSubscriberOnNextListener.get()!=null){
      mSubscriberOnNextListener.get().onStart();
    }
    downInfo.setState(DownState.START);
  }

  /**
   * 完成,隱藏ProgressDialog
   */
  @Override
  public void onCompleted() {
    if(mSubscriberOnNextListener.get()!=null){
      mSubscriberOnNextListener.get().onComplete();
    }
    downInfo.setState(DownState.FINISH);
  }

  /**
   * 對錯誤進行統一處理
   * 隱藏ProgressDialog
   *
   * @param e
   */
  @Override
  public void onError(Throwable e) {
    /*停止下載*/
    HttpDownManager.getInstance().stopDown(downInfo);
    if(mSubscriberOnNextListener.get()!=null){
      mSubscriberOnNextListener.get().onError(e);
    }
    downInfo.setState(DownState.ERROR);
  }

  /**
   * 將onNext方法中的返回結果交給Activity或Fragment自己處理
   *
   * @param t 創建Subscriber時的泛型類型
   */
  @Override
  public void onNext(T t) {
    if (mSubscriberOnNextListener.get() != null) {
      mSubscriberOnNextListener.get().onNext(t);
    }
  }

  @Override
  public void update(long read, long count, boolean done) {
    if(downInfo.getCountLength()>count){
      read=downInfo.getCountLength()-count+read;
    }else{
      downInfo.setCountLength(count);
    }
    downInfo.setReadLength(read);
    if (mSubscriberOnNextListener.get() != null) {
      /*接受進度消息,造成UI阻塞,如果不需要顯示進度可去掉實現邏輯,減少壓力*/
      rx.Observable.just(read).observeOn(AndroidSchedulers.mainThread())
          .subscribe(new Action1<Long>() {
        @Override
        public void call(Long aLong) {
           /*如果暫停或者停止狀態延遲,不需要繼續發送回調,影響顯示*/
          if(downInfo.getState()==DownState.PAUSE||downInfo.getState()==DownState.STOP)return;
          downInfo.setState(DownState.DOWN);
          mSubscriberOnNextListener.get().updateProgress(aLong,downInfo.getCountLength());
        }
      });
    }
  }

}

9.下載管理類封裝HttpDownManager

單利獲取

 /**
   * 獲取單例
   * @return
   */
  public static HttpDownManager getInstance() {
    if (INSTANCE == null) {
      synchronized (HttpDownManager.class) {
        if (INSTANCE == null) {
          INSTANCE = new HttpDownManager();
        }
      }
    }
    return INSTANCE;
  }

因為單利所以需要記錄正在下載的數據和回到sub

 /*回調sub隊列*/
  private HashMap<String,ProgressDownSubscriber> subMap;
  /*單利對象*/
  private volatile static HttpDownManager INSTANCE;

  private HttpDownManager(){
    downInfos=new HashSet<>();
    subMap=new HashMap<>();
  }

開始下載需要記錄下載的service避免每次都重復創建,然后請求sercie接口,得到ResponseBody數據后將數據流寫入到本地文件中(6.0系統后需要提前申請權限)

 /**
   * 開始下載
   */
  public void startDown(DownInfo info){
    /*正在下載不處理*/
    if(info==null||subMap.get(info.getUrl())!=null){
      return;
    }
    /*添加回調處理類*/
    ProgressDownSubscriber subscriber=new ProgressDownSubscriber(info);
    /*記錄回調sub*/
    subMap.put(info.getUrl(),subscriber);
    /*獲取service,多次請求公用一個sercie*/
    HttpService httpService;
    if(downInfos.contains(info)){
      httpService=info.getService();
    }else{
      DownloadInterceptor interceptor = new DownloadInterceptor(subscriber);
      OkHttpClient.Builder builder = new OkHttpClient.Builder();
      //手動創建一個OkHttpClient并設置超時時間
      builder.connectTimeout(info.getConnectionTime(), TimeUnit.SECONDS);
      builder.addInterceptor(interceptor);

      Retrofit retrofit = new Retrofit.Builder()
          .client(builder.build())
          .addConverterFactory(GsonConverterFactory.create())
          .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
          .baseUrl(info.getBaseUrl())
          .build();
      httpService= retrofit.create(HttpService.class);
      info.setService(httpService);
    }
    /*得到rx對象-上一次下載的位置開始下載*/
    httpService.download("bytes=" + info.getReadLength() + "-",info.getUrl())
        /*指定線程*/
        .subscribeOn(Schedulers.io())
        .unsubscribeOn(Schedulers.io())
          /*失敗后的retry配置*/
        .retryWhen(new RetryWhenNetworkException())
        /*讀取下載寫入文件*/
        .map(new Func1<ResponseBody, DownInfo>() {
          @Override
          public DownInfo call(ResponseBody responseBody) {
            try {
              writeCache(responseBody,new File(info.getSavePath()),info);
            } catch (IOException e) {
              /*失敗拋出異常*/
              throw new HttpTimeException(e.getMessage());
            }
            return info;
          }
        })
        /*回調線程*/
        .observeOn(AndroidSchedulers.mainThread())
        /*數據回調*/
        .subscribe(subscriber);

  }

寫入文件

注意:一開始調用進度回調是第一次寫入在進度回調之前,所以需要判斷一次DownInfo是否獲取到下載總長度,沒有這選擇當前ResponseBody 讀取長度為總長度

  /**
   * 寫入文件
   * @param file
   * @param info
   * @throws IOException
   */
  public void writeCache(ResponseBody responseBody,File file,DownInfo info) throws IOException{
    if (!file.getParentFile().exists())
      file.getParentFile().mkdirs();
    long allLength;
    if (info.getCountLength()==0){
      allLength=responseBody.contentLength();
    }else{
      allLength=info.getCountLength();
    }
      FileChannel channelOut = null;
      RandomAccessFile randomAccessFile = null;
      randomAccessFile = new RandomAccessFile(file, "rwd");
      channelOut = randomAccessFile.getChannel();
      MappedByteBuffer mappedBuffer = channelOut.map(FileChannel.MapMode.READ_WRITE,
          info.getReadLength(),allLength-info.getReadLength());
      byte[] buffer = new byte[1024*8];
      int len;
      int record = 0;
      while ((len = responseBody.byteStream().read(buffer)) != -1) {
        mappedBuffer.put(buffer, 0, len);
        record += len;
      }
      responseBody.byteStream().close();
        if (channelOut != null) {
          channelOut.close();
        }
        if (randomAccessFile != null) {
          randomAccessFile.close();
        }
  }

停止下載

調用 subscriber.unsubscribe()解除監聽,然后remove記錄的下載數據和sub回調,并且設置下載狀態(同步數據庫自己添加)

/**
   * 停止下載
   */
  public void stopDown(DownInfo info){
    if(info==null)return;
    info.setState(DownState.STOP);
    info.getListener().onStop();
    if(subMap.containsKey(info.getUrl())) {
      ProgressDownSubscriber subscriber=subMap.get(info.getUrl());
      subscriber.unsubscribe();
      subMap.remove(info.getUrl());
    }
    /*同步數據庫*/
  }

暫停下載

原理和停止下載原理一樣

 /**
   * 暫停下載
   * @param info
   */
  public void pause(DownInfo info){
    if(info==null)return;
    info.setState(DownState.PAUSE);
    info.getListener().onPuase();
    if(subMap.containsKey(info.getUrl())){
      ProgressDownSubscriber subscriber=subMap.get(info.getUrl());
      subscriber.unsubscribe();
      subMap.remove(info.getUrl());
    }
    /*這里需要講info信息寫入到數據中,可自由擴展,用自己項目的數據庫*/
  }

暫停全部和停止全部下載任務
 

/**
   * 停止全部下載
   */
  public void stopAllDown(){
    for (DownInfo downInfo : downInfos) {
      stopDown(downInfo);
    }
    subMap.clear();
    downInfos.clear();
  }

  /**
   * 暫停全部下載
   */
  public void pauseAll(){
    for (DownInfo downInfo : downInfos) {
      pause(downInfo);
    }
    subMap.clear();
    downInfos.clear();
  }

感謝各位的閱讀!關于“RxJava+Retrofit+OkHttp如何實現多文件下載之斷點續傳”這篇文章就分享到這里了,希望以上內容可以對大家有一定的幫助,讓大家可以學到更多知識,如果覺得文章不錯,可以把它分享出去讓更多的人看到吧!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

临江市| 南通市| 衡阳市| 大新县| 易门县| 偏关县| 阿拉善右旗| 武山县| 喀喇| 渝中区| 万安县| 鄂伦春自治旗| 定安县| 大英县| 寿光市| 泉州市| 黄山市| 合川市| 鹤庆县| 宝坻区| 文成县| 邵武市| 高安市| 泗水县| 驻马店市| 固始县| 石棉县| 翁源县| 汕尾市| 溆浦县| 寻乌县| 宿松县| 阿克苏市| 达孜县| 永兴县| 左云县| 河北区| 麻城市| 孟村| 浮梁县| 镇安县|