欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Retrofit+Rxjava下载文件进度的实现

程序员文章站 2023-12-17 20:56:04
前言 最近在学习retrofit,虽然retrofit没有提供文件下载进度的回调,但是retrofit底层依赖的是okhttp,实际上所需要的实现okhttp对下载进度的...

前言

最近在学习retrofit,虽然retrofit没有提供文件下载进度的回调,但是retrofit底层依赖的是okhttp,实际上所需要的实现okhttp对下载进度的监听,在okhttp的官方demo中,有一个progress.java的文件,顾名思义。点我查看。

准备工作

本文采用dagger2,retrofit,rxjava。

compile'com.squareup.retrofit2:retrofit:2.0.2'
compile'com.squareup.retrofit2:converter-gson:2.0.2'
compile'com.squareup.retrofit2:adapter-rxjava:2.0.2'
//dagger2
compile'com.google.dagger:dagger:2.6'
apt'com.google.dagger:dagger-compiler:2.6'
//rxjava
compile'io.reactivex:rxandroid:1.2.0'
compile'io.reactivex:rxjava:1.1.5'
compile'com.jakewharton.rxbinding:rxbinding:0.4.0'

改造responsebody

okhttp3默认的responsebody因为不知道进度的相关信息,所以需要对其进行改造。可以使用接口监听进度信息。这里采用的是rxbus发送fileloadevent对象实现对下载进度的实时更新。这里先讲改造的progressresponsebody。

public class progressresponsebody extends responsebody {
 private responsebody responsebody;
 private bufferedsource bufferedsource;
 public progressresponsebody(responsebody responsebody) {
 this.responsebody = responsebody;
 }
 @override
 public mediatype contenttype() {
 return responsebody.contenttype();
 }
 @override
 public long contentlength() {
 return responsebody.contentlength();
 }
 @override
 public bufferedsource source() {
 if (bufferedsource == null) {
  bufferedsource = okio.buffer(source(responsebody.source()));
 }
 return bufferedsource;
 }
 private source source(source source) {
 return new forwardingsource(source) {
  long bytesreaded = 0;
  @override
  public long read(buffer sink, long bytecount) throws ioexception {
  long bytesread = super.read(sink, bytecount);
  bytesreaded += bytesread == -1 ? 0 : bytesread;
  //实时发送当前已读取的字节和总字节
  rxbus.getinstance().post(new fileloadevent(contentlength(), bytesreaded));
  return bytesread;
  }
 };
 }
}

呃,okio相关知识我也正在学,这个是从官方demo中copy的代码,只不过中间使用了rxbus实时发送fileloadevent对象。

fileloadevent

fileloadevent很简单,包含了当前已加载进度和文件总大小。

public class fileloadevent {
 long total;
 long bytesloaded;
 public long getbytesloaded() {
 return bytesloaded;
 }
 public long gettotal() {
 return total;
 }
 public fileloadevent(long total, long bytesloaded) {
 this.total = total;
 this.bytesloaded = bytesloaded;
 }
}

rxbus

rxbus 名字看起来像一个库,但它并不是一个库,而是一种模式,它的思想是使用 rxjava 来实现了 eventbus ,而让你不再需要使用otto或者 eventbus。点我查看详情。

public class rxbus {
 private static volatile rxbus minstance;
 private serializedsubject<object, object> msubject;
 private hashmap<string, compositesubscription> msubscriptionmap;
 /**
 * publishsubject只会把在订阅发生的时间点之后来自原始observable的数据发射给观察者
 * subject同时充当了observer和observable的角色,subject是非线程安全的,要避免该问题,
 * 需要将 subject转换为一个 serializedsubject ,上述rxbus类中把线程非安全的publishsubject包装成线程安全的subject。
 */
 private rxbus() {
 msubject = new serializedsubject<>(publishsubject.create());
 }
 /**
 * 单例 双重锁
 * @return
 */
 public static rxbus getinstance() {
 if (minstance == null) {
  synchronized (rxbus.class) {
  if (minstance == null) {
   minstance = new rxbus();
  }
  }
 }
 return minstance;
 }
 /**
 * 发送一个新的事件
 * @param o
 */
 public void post(object o) {
 msubject.onnext(o);
 }
 /**
 * 根据传递的 eventtype 类型返回特定类型(eventtype)的 被观察者
 * @param type
 * @param <t>
 * @return
 */
 public <t> observable<t> tobservable(final class<t> type) {
 //oftype操作符只发射指定类型的数据,其内部就是filter+cast
 return msubject.oftype(type);
 }
 public <t> subscription dosubscribe(class<t> type, action1<t> next, action1<throwable> error) {
 return tobservable(type)
  .onbackpressurebuffer()
  .subscribeon(schedulers.io())
  .observeon(androidschedulers.mainthread())
  .subscribe(next, error);
 }
 public void addsubscription(object o, subscription subscription) {
 if (msubscriptionmap == null) {
  msubscriptionmap = new hashmap<>();
 }
 string key = o.getclass().getname();
 if (msubscriptionmap.get(key) != null) {
  msubscriptionmap.get(key).add(subscription);
 } else {
  compositesubscription compositesubscription = new compositesubscription();
  compositesubscription.add(subscription);
  msubscriptionmap.put(key, compositesubscription);
  // log.e("air", "addsubscription:订阅成功 " );
 }
 }
 public void unsubscribe(object o) {
 if (msubscriptionmap == null) {
  return;
 }
 string key = o.getclass().getname();
 if (!msubscriptionmap.containskey(key)) {
  return;
 }
 if (msubscriptionmap.get(key) != null) {
  msubscriptionmap.get(key).unsubscribe();
 }
 msubscriptionmap.remove(key);
 //log.e("air", "unsubscribe: 取消订阅" );
 }
}

filecallback

那么,重点来了。代码其实有5个方法需要重写,好吧,其实这些方法可以精简一下。其中progress()方法有两个参数,progress和total,分别表示文件已下载的大小和总大小,我们将这两个参数不断更新到ui上就行了。

public abstract class filecallback<t> {
 private string destfiledir;
 private string destfilename;
 public filecallback(string destfiledir, string destfilename) {
 this.destfiledir = destfiledir;
 this.destfilename = destfilename;
 subscribeloadprogress();
 }
 public abstract void onsuccess(t t);
 public abstract void progress(long progress, long total);
 public abstract void onstart();
 public abstract void oncompleted();
 public abstract void onerror(throwable e);
 public void savefile(responsebody body) {
 inputstream is = null;
 byte[] buf = new byte[2048];
 int len;
 fileoutputstream fos = null;
 try {
  is = body.bytestream();
  file dir = new file(destfiledir);
  if (!dir.exists()) {
  dir.mkdirs();
  }
  file file = new file(dir, destfilename);
  fos = new fileoutputstream(file);
  while ((len = is.read(buf)) != -1) {
  fos.write(buf, 0, len);
  }
  fos.flush();
  unsubscribe();
  //oncompleted();
 } catch (filenotfoundexception e) {
  e.printstacktrace();
 } catch (ioexception e) {
  e.printstacktrace();
 } finally {
  try {
  if (is != null) is.close();
  if (fos != null) fos.close();
  } catch (ioexception e) {
  log.e("savefile", e.getmessage());
  }
 }
 }
 /**
 * 订阅加载的进度条
 */
 public void subscribeloadprogress() {
 subscription subscription = rxbus.getinstance().dosubscribe(fileloadevent.class, new action1<fileloadevent>() {
  @override
  public void call(fileloadevent fileloadevent) {
  progress(fileloadevent.getbytesloaded(),fileloadevent.gettotal());
  }
 }, new action1<throwable>() {
  @override
  public void call(throwable throwable) {
  //todo 对异常的处理
  }
 });
 rxbus.getinstance().addsubscription(this, subscription);
 }
 /**
 * 取消订阅,防止内存泄漏
 */
 public void unsubscribe() {
 rxbus.getinstance().unsubscribe(this);
 }
}

开始下载

使用自己的progressresponsebody

通过okhttpclient的拦截器去拦截response,并将我们的progressreponsebody设置进去监听进度。

public class progressinterceptor implements interceptor {
 @override
 public response intercept(chain chain) throws ioexception {
 response originalresponse = chain.proceed(chain.request());
 return originalresponse.newbuilder()
  .body(new progressresponsebody(originalresponse.body()))
  .build();
 }
}

构建retrofit

@module
public class apimodule {
 @provides
 @singleton
 public okhttpclient provideclient() {
 okhttpclient client = new okhttpclient.builder()
  .addinterceptor(new progressinterceptor())
  .build();
 return client;
 }
 @provides
 @singleton
 public retrofit provideretrofit(okhttpclient client){
 retrofit retrofit = new retrofit.builder()
  .client(client)
  .baseurl(constant.host)
  .addcalladapterfactory(rxjavacalladapterfactory.create())
  .addconverterfactory(gsonconverterfactory.create())
  .build();
 return retrofit;
 }
 @provides
 @singleton
 public apiinfo provideapiinfo(retrofit retrofit){
 return retrofit.create(apiinfo.class);
 }
 @provides
 @singleton
 public apimanager provideapimanager(application application, apiinfo apiinfo){
 return new apimanager(application,apiinfo);
 }
}

请求接口

public interface apiinfo {
 @streaming
 @get
 observable<responsebody> download(@url string url);
}

执行请求

public void load(string url, final filecallback<responsebody> callback){
 apiinfo.download(url)
  .subscribeon(schedulers.io())//请求网络 在调度者的io线程
  .observeon(schedulers.io()) //指定线程保存文件
  .doonnext(new action1<responsebody>() {
   @override
   public void call(responsebody body) {
   callback.savefile(body);
   }
  })
  .observeon(androidschedulers.mainthread()) //在主线程中更新ui
  .subscribe(new filesubscriber<responsebody>(application,callback));
 }

在presenter层中执行网络请求。

通过v层依赖注入的presenter对象调用请求网络,请求网络后调用v层更新ui的操作。

public void load(string url){
 string filename = "app.apk";
 string filestoredir = environment.getexternalstoragedirectory().getabsolutepath();
 log.e(tag, "load: "+filestoredir.tostring() );
 filecallback<responsebody> callback = new filecallback<responsebody>(filestoredir,filename) {
  @override
  public void onsuccess(final responsebody responsebody) {
  toast.maketext(app.getinstance(),"下载文件成功",toast.length_short).show();
  }
  @override
  public void progress(long progress, long total) {
  ihomeview.update(total,progress);
  }
  @override
  public void onstart() {
  ihomeview.showloading();
  }
  @override
  public void oncompleted() {
  ihomeview.hideloading();
  }
  @override
  public void onerror(throwable e) {
  //todo: 对异常的一些处理
  e.printstacktrace();
  }
 };
 apimanager.load(url, callback);
 }

踩到的坑。

依赖的retrofit版本一定要保持一致!!!说多了都是泪啊。

保存文件时要使用rxjava的doonnext操作符,后续更新ui的操作切换到ui线程。

总结

看似代码很多,其实过程并不复杂:

在保存文件时,调用forwardingsource的read方法,通过rxbus发送实时的fileloadevent对象。

filecallback订阅rxbus发送的fileloadevent。通过接收到fileloadevent中的下载进度和文件总大小对ui进行更新。

在下载保存文件完成后,取消订阅,防止内存泄漏。

demo地址:https://github.com/airmiya/downloaddemo

上一篇:

下一篇: