Retrofit+Rxjava下载文件进度的实现
前言
最近在学习retrofit,虽然retrofit没有提供文件下载进度的回调,但是retrofit底层依赖的是okhttp,实际上所需要的实现okhttp对下载进度的监听,在okhttp的官方demo中,有一个progress.java的文件,顾名思义。点我查看。
准备工作
本文采用dagger2,retrofit,rxjava。
compile'com.squareup.retrofit2:retrofit:2.0.2' compile'com.squareup.retrofit2:converter-gson:2.0.2' compile'com.squareup.retrofit2:adapter-rxjava:2.0.2' //dagger2 compile'com.google.dagger:dagger:2.6' apt'com.google.dagger:dagger-compiler:2.6' //rxjava compile'io.reactivex:rxandroid:1.2.0' compile'io.reactivex:rxjava:1.1.5' compile'com.jakewharton.rxbinding:rxbinding:0.4.0'
改造responsebody
okhttp3默认的responsebody因为不知道进度的相关信息,所以需要对其进行改造。可以使用接口监听进度信息。这里采用的是rxbus发送fileloadevent对象实现对下载进度的实时更新。这里先讲改造的progressresponsebody。
public class progressresponsebody extends responsebody { private responsebody responsebody; private bufferedsource bufferedsource; public progressresponsebody(responsebody responsebody) { this.responsebody = responsebody; } @override public mediatype contenttype() { return responsebody.contenttype(); } @override public long contentlength() { return responsebody.contentlength(); } @override public bufferedsource source() { if (bufferedsource == null) { bufferedsource = okio.buffer(source(responsebody.source())); } return bufferedsource; } private source source(source source) { return new forwardingsource(source) { long bytesreaded = 0; @override public long read(buffer sink, long bytecount) throws ioexception { long bytesread = super.read(sink, bytecount); bytesreaded += bytesread == -1 ? 0 : bytesread; //实时发送当前已读取的字节和总字节 rxbus.getinstance().post(new fileloadevent(contentlength(), bytesreaded)); return bytesread; } }; } }
呃,okio相关知识我也正在学,这个是从官方demo中copy的代码,只不过中间使用了rxbus实时发送fileloadevent对象。
fileloadevent
fileloadevent很简单,包含了当前已加载进度和文件总大小。
public class fileloadevent { long total; long bytesloaded; public long getbytesloaded() { return bytesloaded; } public long gettotal() { return total; } public fileloadevent(long total, long bytesloaded) { this.total = total; this.bytesloaded = bytesloaded; } }
rxbus
rxbus 名字看起来像一个库,但它并不是一个库,而是一种模式,它的思想是使用 rxjava 来实现了 eventbus ,而让你不再需要使用otto或者 eventbus。点我查看详情。
public class rxbus { private static volatile rxbus minstance; private serializedsubject<object, object> msubject; private hashmap<string, compositesubscription> msubscriptionmap; /** * publishsubject只会把在订阅发生的时间点之后来自原始observable的数据发射给观察者 * subject同时充当了observer和observable的角色,subject是非线程安全的,要避免该问题, * 需要将 subject转换为一个 serializedsubject ,上述rxbus类中把线程非安全的publishsubject包装成线程安全的subject。 */ private rxbus() { msubject = new serializedsubject<>(publishsubject.create()); } /** * 单例 双重锁 * @return */ public static rxbus getinstance() { if (minstance == null) { synchronized (rxbus.class) { if (minstance == null) { minstance = new rxbus(); } } } return minstance; } /** * 发送一个新的事件 * @param o */ public void post(object o) { msubject.onnext(o); } /** * 根据传递的 eventtype 类型返回特定类型(eventtype)的 被观察者 * @param type * @param <t> * @return */ public <t> observable<t> tobservable(final class<t> type) { //oftype操作符只发射指定类型的数据,其内部就是filter+cast return msubject.oftype(type); } public <t> subscription dosubscribe(class<t> type, action1<t> next, action1<throwable> error) { return tobservable(type) .onbackpressurebuffer() .subscribeon(schedulers.io()) .observeon(androidschedulers.mainthread()) .subscribe(next, error); } public void addsubscription(object o, subscription subscription) { if (msubscriptionmap == null) { msubscriptionmap = new hashmap<>(); } string key = o.getclass().getname(); if (msubscriptionmap.get(key) != null) { msubscriptionmap.get(key).add(subscription); } else { compositesubscription compositesubscription = new compositesubscription(); compositesubscription.add(subscription); msubscriptionmap.put(key, compositesubscription); // log.e("air", "addsubscription:订阅成功 " ); } } public void unsubscribe(object o) { if (msubscriptionmap == null) { return; } string key = o.getclass().getname(); if (!msubscriptionmap.containskey(key)) { return; } if (msubscriptionmap.get(key) != null) { msubscriptionmap.get(key).unsubscribe(); } msubscriptionmap.remove(key); //log.e("air", "unsubscribe: 取消订阅" ); } }
filecallback
那么,重点来了。代码其实有5个方法需要重写,好吧,其实这些方法可以精简一下。其中progress()方法有两个参数,progress和total,分别表示文件已下载的大小和总大小,我们将这两个参数不断更新到ui上就行了。
public abstract class filecallback<t> { private string destfiledir; private string destfilename; public filecallback(string destfiledir, string destfilename) { this.destfiledir = destfiledir; this.destfilename = destfilename; subscribeloadprogress(); } public abstract void onsuccess(t t); public abstract void progress(long progress, long total); public abstract void onstart(); public abstract void oncompleted(); public abstract void onerror(throwable e); public void savefile(responsebody body) { inputstream is = null; byte[] buf = new byte[2048]; int len; fileoutputstream fos = null; try { is = body.bytestream(); file dir = new file(destfiledir); if (!dir.exists()) { dir.mkdirs(); } file file = new file(dir, destfilename); fos = new fileoutputstream(file); while ((len = is.read(buf)) != -1) { fos.write(buf, 0, len); } fos.flush(); unsubscribe(); //oncompleted(); } catch (filenotfoundexception e) { e.printstacktrace(); } catch (ioexception e) { e.printstacktrace(); } finally { try { if (is != null) is.close(); if (fos != null) fos.close(); } catch (ioexception e) { log.e("savefile", e.getmessage()); } } } /** * 订阅加载的进度条 */ public void subscribeloadprogress() { subscription subscription = rxbus.getinstance().dosubscribe(fileloadevent.class, new action1<fileloadevent>() { @override public void call(fileloadevent fileloadevent) { progress(fileloadevent.getbytesloaded(),fileloadevent.gettotal()); } }, new action1<throwable>() { @override public void call(throwable throwable) { //todo 对异常的处理 } }); rxbus.getinstance().addsubscription(this, subscription); } /** * 取消订阅,防止内存泄漏 */ public void unsubscribe() { rxbus.getinstance().unsubscribe(this); } }
开始下载
使用自己的progressresponsebody
通过okhttpclient的拦截器去拦截response,并将我们的progressreponsebody设置进去监听进度。
public class progressinterceptor implements interceptor { @override public response intercept(chain chain) throws ioexception { response originalresponse = chain.proceed(chain.request()); return originalresponse.newbuilder() .body(new progressresponsebody(originalresponse.body())) .build(); } }
构建retrofit
@module public class apimodule { @provides @singleton public okhttpclient provideclient() { okhttpclient client = new okhttpclient.builder() .addinterceptor(new progressinterceptor()) .build(); return client; } @provides @singleton public retrofit provideretrofit(okhttpclient client){ retrofit retrofit = new retrofit.builder() .client(client) .baseurl(constant.host) .addcalladapterfactory(rxjavacalladapterfactory.create()) .addconverterfactory(gsonconverterfactory.create()) .build(); return retrofit; } @provides @singleton public apiinfo provideapiinfo(retrofit retrofit){ return retrofit.create(apiinfo.class); } @provides @singleton public apimanager provideapimanager(application application, apiinfo apiinfo){ return new apimanager(application,apiinfo); } }
请求接口
public interface apiinfo { @streaming @get observable<responsebody> download(@url string url); }
执行请求
public void load(string url, final filecallback<responsebody> callback){ apiinfo.download(url) .subscribeon(schedulers.io())//请求网络 在调度者的io线程 .observeon(schedulers.io()) //指定线程保存文件 .doonnext(new action1<responsebody>() { @override public void call(responsebody body) { callback.savefile(body); } }) .observeon(androidschedulers.mainthread()) //在主线程中更新ui .subscribe(new filesubscriber<responsebody>(application,callback)); }
在presenter层中执行网络请求。
通过v层依赖注入的presenter对象调用请求网络,请求网络后调用v层更新ui的操作。
public void load(string url){ string filename = "app.apk"; string filestoredir = environment.getexternalstoragedirectory().getabsolutepath(); log.e(tag, "load: "+filestoredir.tostring() ); filecallback<responsebody> callback = new filecallback<responsebody>(filestoredir,filename) { @override public void onsuccess(final responsebody responsebody) { toast.maketext(app.getinstance(),"下载文件成功",toast.length_short).show(); } @override public void progress(long progress, long total) { ihomeview.update(total,progress); } @override public void onstart() { ihomeview.showloading(); } @override public void oncompleted() { ihomeview.hideloading(); } @override public void onerror(throwable e) { //todo: 对异常的一些处理 e.printstacktrace(); } }; apimanager.load(url, callback); }
踩到的坑。
依赖的retrofit版本一定要保持一致!!!说多了都是泪啊。
保存文件时要使用rxjava的doonnext操作符,后续更新ui的操作切换到ui线程。
总结
看似代码很多,其实过程并不复杂:
在保存文件时,调用forwardingsource的read方法,通过rxbus发送实时的fileloadevent对象。
filecallback订阅rxbus发送的fileloadevent。通过接收到fileloadevent中的下载进度和文件总大小对ui进行更新。
在下载保存文件完成后,取消订阅,防止内存泄漏。