RxJava+Retrofit+OkHttp实现多文件下载之断点续传
背景
断点续传下载一直是移动开发中必不可少的一项重要的技术,同样的rxjava和retrofit的结合让这个技术解决起来更加的灵活,我们完全可以封装一个适合自的下载框架,简单而且安全!
效果
实现
下载和之前的http请求可以相互独立,所以我们单独给download建立一个工程moudel处理
1.创建service接口
和以前一样,先写接口
注意:streaming是判断是否写入内存的标示,如果小文件可以考虑不写,一般情况必须写;下载地址需要通过@url动态指定(不适固定的),@head标签是指定下载的起始位置(断点续传的位置)
/*断点续传下载接口*/ @streaming/*大文件需要加入这个判断,防止下载过程中写入到内存中*/ @get observable<responsebody> download(@header("range") string start, @url string url);
2.复写responsebody
和之前的上传封装一样,下载更加的需要进度,所以我们同样覆盖responsebody类,写入进度监听回调
/** * 自定义进度的body * @author wzg */ public class downloadresponsebody extends responsebody { private responsebody responsebody; private downloadprogresslistener progresslistener; private bufferedsource bufferedsource; public downloadresponsebody(responsebody responsebody, downloadprogresslistener progresslistener) { this.responsebody = responsebody; this.progresslistener = progresslistener; } @override public bufferedsource source() { if (bufferedsource == null) { bufferedsource = okio.buffer(source(responsebody.source())); } return bufferedsource; } private source source(source source) { return new forwardingsource(source) { long totalbytesread = 0l; @override public long read(buffer sink, long bytecount) throws ioexception { long bytesread = super.read(sink, bytecount); // read() returns the number of bytes read, or -1 if this source is exhausted. totalbytesread += bytesread != -1 ? bytesread : 0; if (null != progresslistener) { progresslistener.update(totalbytesread, responsebody.contentlength(), bytesread == -1); } return bytesread; } }; } }
3.自定义进度回调接口
/** * 成功回调处理 * created by wzg on 2016/10/20. */ public interface downloadprogresslistener { /** * 下载进度 * @param read * @param count * @param done */ void update(long read, long count, boolean done); }
4.复写interceptor
复写interceptor,可以将我们的监听回调通过okhttp的client方法addinterceptor自动加载我们的监听回调和responsebody
/** * 成功回调处理 * created by wzg on 2016/10/20. */ public class downloadinterceptor implements interceptor { private downloadprogresslistener listener; public downloadinterceptor(downloadprogresslistener listener) { this.listener = listener; } @override public response intercept(chain chain) throws ioexception { response originalresponse = chain.proceed(chain.request()); return originalresponse.newbuilder() .body(new downloadresponsebody(originalresponse.body(), listener)) .build(); } }
5.封装请求downinfo数据
这个类中的数据可*扩展,用户自己选择需要保持到数据库中的数据,可以*选择需要数据库第三方框架,demo采用greendao框架存储数据
public class downinfo { /*存储位置*/ private string savepath; /*下载url*/ private string url; /*基础url*/ private string baseurl; /*文件总长度*/ private long countlength; /*下载长度*/ private long readlength; /*下载唯一的httpservice*/ private httpservice service; /*回调监听*/ private httpprogressonnextlistener listener; /*超时设置*/ private int default_timeout = 6; /*下载状态*/ private downstate state; }
6.downstate状态封装
很简单,和大多数封装框架一样
public enum downstate { start, down, pause, stop, error, finish, }
7.请求httpprogressonnextlistener回调封装类
注意:这里和downloadprogresslistener不同,这里是下载这个过程中的监听回调,downloadprogresslistener只是进度的监听
通过抽象类,可以*选择需要覆盖的类,不需要完全覆盖!更加灵活
/** * 下载过程中的回调处理 * created by wzg on 2016/10/20. */ public abstract class httpprogressonnextlistener<t> { /** * 成功后回调方法 * @param t */ public abstract void onnext(t t); /** * 开始下载 */ public abstract void onstart(); /** * 完成下载 */ public abstract void oncomplete(); /** * 下载进度 * @param readlength * @param countlength */ public abstract void updateprogress(long readlength, long countlength); /** * 失败或者错误方法 * 主动调用,更加灵活 * @param e */ public void onerror(throwable e){ } /** * 暂停下载 */ public void onpuase(){ } /** * 停止下载销毁 */ public void onstop(){ } }
8.封装回调subscriber
准备的工作做完,需要将回调和传入回调的信息统一封装到sub中,统一判断;和封装二的原理一样,我们通过自定义subscriber来提前处理返回的数据,让用户字需要关系成功和失败以及向关心的数据,避免重复多余的代码出现在处理类中
- sub需要继承downloadprogresslistener,和自带的回调一起组成我们需要的回调结果
- 传入downinfo数据,通过回调设置downinfo的不同状态,保存状态
- 通过rxandroid将进度回调指定到主线程中(如果不需要进度最好去掉该处理避免主线程处理负担)
- update进度回调在断点续传使用时,需要手动判断断点后加载的长度,因为指定断点下载长度下载后总长度=(物理长度-起始下载长度)
/** * 用于在http请求开始时,自动显示一个progressdialog * 在http请求结束是,关闭progressdialog * 调用者自己对请求数据进行处理 * created by wzg on 2016/7/16. */ public class progressdownsubscriber<t> extends subscriber<t> implements downloadprogresslistener { //弱引用结果回调 private weakreference<httpprogressonnextlistener> msubscriberonnextlistener; /*下载数据*/ private downinfo downinfo; public progressdownsubscriber(downinfo downinfo) { this.msubscriberonnextlistener = new weakreference<>(downinfo.getlistener()); this.downinfo=downinfo; } /** * 订阅开始时调用 * 显示progressdialog */ @override public void onstart() { if(msubscriberonnextlistener.get()!=null){ msubscriberonnextlistener.get().onstart(); } downinfo.setstate(downstate.start); } /** * 完成,隐藏progressdialog */ @override public void oncompleted() { if(msubscriberonnextlistener.get()!=null){ msubscriberonnextlistener.get().oncomplete(); } downinfo.setstate(downstate.finish); } /** * 对错误进行统一处理 * 隐藏progressdialog * * @param e */ @override public void onerror(throwable e) { /*停止下载*/ httpdownmanager.getinstance().stopdown(downinfo); if(msubscriberonnextlistener.get()!=null){ msubscriberonnextlistener.get().onerror(e); } downinfo.setstate(downstate.error); } /** * 将onnext方法中的返回结果交给activity或fragment自己处理 * * @param t 创建subscriber时的泛型类型 */ @override public void onnext(t t) { if (msubscriberonnextlistener.get() != null) { msubscriberonnextlistener.get().onnext(t); } } @override public void update(long read, long count, boolean done) { if(downinfo.getcountlength()>count){ read=downinfo.getcountlength()-count+read; }else{ downinfo.setcountlength(count); } downinfo.setreadlength(read); if (msubscriberonnextlistener.get() != null) { /*接受进度消息,造成ui阻塞,如果不需要显示进度可去掉实现逻辑,减少压力*/ rx.observable.just(read).observeon(androidschedulers.mainthread()) .subscribe(new action1<long>() { @override public void call(long along) { /*如果暂停或者停止状态延迟,不需要继续发送回调,影响显示*/ if(downinfo.getstate()==downstate.pause||downinfo.getstate()==downstate.stop)return; downinfo.setstate(downstate.down); msubscriberonnextlistener.get().updateprogress(along,downinfo.getcountlength()); } }); } } }
9.下载管理类封装httpdownmanager
单利获取
/** * 获取单例 * @return */ public static httpdownmanager getinstance() { if (instance == null) { synchronized (httpdownmanager.class) { if (instance == null) { instance = new httpdownmanager(); } } } return instance; }
因为单利所以需要记录正在下载的数据和回到sub
/*回调sub队列*/ private hashmap<string,progressdownsubscriber> submap; /*单利对象*/ private volatile static httpdownmanager instance; private httpdownmanager(){ downinfos=new hashset<>(); submap=new hashmap<>(); }
开始下载需要记录下载的service避免每次都重复创建,然后请求sercie接口,得到responsebody数据后将数据流写入到本地文件中(6.0系统后需要提前申请权限)
/** * 开始下载 */ public void startdown(downinfo info){ /*正在下载不处理*/ if(info==null||submap.get(info.geturl())!=null){ return; } /*添加回调处理类*/ progressdownsubscriber subscriber=new progressdownsubscriber(info); /*记录回调sub*/ submap.put(info.geturl(),subscriber); /*获取service,多次请求公用一个sercie*/ httpservice httpservice; if(downinfos.contains(info)){ httpservice=info.getservice(); }else{ downloadinterceptor interceptor = new downloadinterceptor(subscriber); okhttpclient.builder builder = new okhttpclient.builder(); //手动创建一个okhttpclient并设置超时时间 builder.connecttimeout(info.getconnectiontime(), timeunit.seconds); builder.addinterceptor(interceptor); retrofit retrofit = new retrofit.builder() .client(builder.build()) .addconverterfactory(gsonconverterfactory.create()) .addcalladapterfactory(rxjavacalladapterfactory.create()) .baseurl(info.getbaseurl()) .build(); httpservice= retrofit.create(httpservice.class); info.setservice(httpservice); } /*得到rx对象-上一次下載的位置開始下載*/ httpservice.download("bytes=" + info.getreadlength() + "-",info.geturl()) /*指定线程*/ .subscribeon(schedulers.io()) .unsubscribeon(schedulers.io()) /*失败后的retry配置*/ .retrywhen(new retrywhennetworkexception()) /*读取下载写入文件*/ .map(new func1<responsebody, downinfo>() { @override public downinfo call(responsebody responsebody) { try { writecache(responsebody,new file(info.getsavepath()),info); } catch (ioexception e) { /*失败抛出异常*/ throw new httptimeexception(e.getmessage()); } return info; } }) /*回调线程*/ .observeon(androidschedulers.mainthread()) /*数据回调*/ .subscribe(subscriber); }
写入文件
注意:一开始调用进度回调是第一次写入在进度回调之前,所以需要判断一次downinfo是否获取到下载总长度,没有这选择当前responsebody 读取长度为总长度
/** * 写入文件 * @param file * @param info * @throws ioexception */ public void writecache(responsebody responsebody,file file,downinfo info) throws ioexception{ if (!file.getparentfile().exists()) file.getparentfile().mkdirs(); long alllength; if (info.getcountlength()==0){ alllength=responsebody.contentlength(); }else{ alllength=info.getcountlength(); } filechannel channelout = null; randomaccessfile randomaccessfile = null; randomaccessfile = new randomaccessfile(file, "rwd"); channelout = randomaccessfile.getchannel(); mappedbytebuffer mappedbuffer = channelout.map(filechannel.mapmode.read_write, info.getreadlength(),alllength-info.getreadlength()); byte[] buffer = new byte[1024*8]; int len; int record = 0; while ((len = responsebody.bytestream().read(buffer)) != -1) { mappedbuffer.put(buffer, 0, len); record += len; } responsebody.bytestream().close(); if (channelout != null) { channelout.close(); } if (randomaccessfile != null) { randomaccessfile.close(); } }
停止下载
调用 subscriber.unsubscribe()解除监听,然后remove记录的下载数据和sub回调,并且设置下载状态(同步数据库自己添加)
/** * 停止下载 */ public void stopdown(downinfo info){ if(info==null)return; info.setstate(downstate.stop); info.getlistener().onstop(); if(submap.containskey(info.geturl())) { progressdownsubscriber subscriber=submap.get(info.geturl()); subscriber.unsubscribe(); submap.remove(info.geturl()); } /*同步数据库*/ }
暂停下载
原理和停止下载原理一样
/** * 暂停下载 * @param info */ public void pause(downinfo info){ if(info==null)return; info.setstate(downstate.pause); info.getlistener().onpuase(); if(submap.containskey(info.geturl())){ progressdownsubscriber subscriber=submap.get(info.geturl()); subscriber.unsubscribe(); submap.remove(info.geturl()); } /*这里需要讲info信息写入到数据中,可*扩展,用自己项目的数据库*/ }
暂停全部和停止全部下载任务
/** * 停止全部下载 */ public void stopalldown(){ for (downinfo downinfo : downinfos) { stopdown(downinfo); } submap.clear(); downinfos.clear(); } /** * 暂停全部下载 */ public void pauseall(){ for (downinfo downinfo : downinfos) { pause(downinfo); } submap.clear(); downinfos.clear(); }
整合代码httpdownmanager
同样使用了封装二中的retry处理和运行时异常自定义处理封装(不复述了)
补充
有同学说不知道数据库这块怎么替换,所以我加入了greendao框架去优化数据库存储,在实际运用中可以将这块的逻辑替换成你项目的数据库框架(之前用的都是realm,这回正好练练手)
只需要替换dbutil的方法即可
总结
到此我们的rxjava+retrofit+okhttp深入浅出-封装就基本完成了,已经可以完全胜任开发和学习的全部工作,如果后续再使用过程中有任何问题欢迎留言给我,会一直维护!
1.retrofit+rxjava+okhttp基本使用方法
2.统一处理请求数据格式
3.统一的progressdialog和回调subscriber处理
4.取消http请求
5.预处理http请求
6.返回数据的统一判断
7.失败后的retry封装处理
8.rxlifecycle管理生命周期,防止泄露
9.文件上传和文件下载(支持多文件断点续传)
源码:传送门-全部封装源码
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。