Rxjava功能操作符的使用方法详解
程序员文章站
2024-04-01 22:37:16
rxjava功能个人感觉很好用,里面的一些操作符很方便,rxjava有:被观察者,观察者,订阅者,
被观察者通过订阅者订阅观察者,从而实现观察者监听被观察者返回的数据...
rxjava功能个人感觉很好用,里面的一些操作符很方便,rxjava有:被观察者,观察者,订阅者,
被观察者通过订阅者订阅观察者,从而实现观察者监听被观察者返回的数据
下面把rxjava常用的模型代码列出来,还有一些操作符的运用:
依赖:
compile 'io.reactivex.rxjava2:rxandroid:2.0.1' // because rxandroid releases are few and far between, it is recommended you also // explicitly depend on rxjava's latest version for bug fixes and new features. compile 'io.reactivex.rxjava2:rxjava:2.1.5'
这个是另一种解析数据的方法,阿里巴巴旗下的,听说是解析最快的解析器。。。。
compile 'com.alibaba:fastjson:1.2.39'
import android.os.bundle; import android.support.v7.app.appcompatactivity; import android.view.view; import android.widget.textview; import com.alibaba.fastjson.jsonobject; import java.io.ioexception; import java.util.concurrent.timeunit; import io.reactivex.backpressurestrategy; import io.reactivex.flowable; import io.reactivex.flowableemitter; import io.reactivex.flowableonsubscribe; import io.reactivex.observable; import io.reactivex.observableemitter; import io.reactivex.observableonsubscribe; import io.reactivex.observer; import io.reactivex.android.schedulers.androidschedulers; import io.reactivex.annotations.nonnull; import io.reactivex.disposables.disposable; import io.reactivex.functions.bifunction; import io.reactivex.functions.consumer; import io.reactivex.functions.function; import io.reactivex.schedulers.schedulers; import okhttp3.call; import okhttp3.callback; import okhttp3.okhttpclient; import okhttp3.request; import okhttp3.response; public class mainactivity extends appcompatactivity { private textview name; @override protected void oncreate(bundle savedinstancestate) { super.oncreate(savedinstancestate); setcontentview(r.layout.activity_main); name = (textview) findviewbyid(r.id.name); //用来调用下面的方法,监听。 name.setonclicklistener(new view.onclicklistener() { @override public void onclick(view v) { interval(); } }); } //例1:observer public void observer() { //观察者 observer<string> observer = new observer<string>() { @override public void onsubscribe(@nonnull disposable d) { } @override public void onnext(@nonnull string s) { //接收从被观察者中返回的数据 system.out.println("onnext :" + s); } @override public void onerror(@nonnull throwable e) { } @override public void oncomplete() { } }; //被观察者 observable<string> observable = new observable<string>() { @override protected void subscribeactual(observer<!--? super string--> observer) { observer.onnext("11111"); observer.onnext("22222"); observer.oncomplete(); } }; //产生了订阅 observable.subscribe(observer); } //例2:flowable private void flowable(){ //被观察者 flowable.create(new flowableonsubscribe<string>() { @override public void subscribe(@nonnull flowableemitter<string> e) throws exception { for (int i = 0; i < 100; i++) { e.onnext(i+""); } } //背压的策略,buffer缓冲区 观察者 //背压一共给了五种策略 // buffer、 // drop、打印前128个,后面的删除 // error、 // latest、打印前128个和最后一个,其余删除 // missing //这里的策略若不是buffer 那么,会出现著名的:missingbackpressureexception错误 }, backpressurestrategy.buffer).subscribe(new consumer<string>() { @override public void accept(string s) throws exception { system.out.println("subscribe accept"+s); thread.sleep(1000); } }); } //例3:线程调度器 scheduler public void flowable1(){ flowable.create(new flowableonsubscribe<string>() { @override public void subscribe(@nonnull flowableemitter<string> e) throws exception { for (int i = 0; i < 100; i++) { //输出在哪个线程 system.out.println("subscribe thread.currentthread.getname = " + thread.currentthread().getname()); e.onnext(i+""); } } },backpressurestrategy.buffer) //被观察者一般放在子线程 .subscribeon(schedulers.io()) //观察者一般放在主线程 .observeon(androidschedulers.mainthread()) .subscribe(new consumer<string>() { @override public void accept(string s) throws exception { system.out.println("s"+ s); thread.sleep(100); //输出在哪个线程 system.out.println("subscribe thread.currentthread.getname = " + thread.currentthread().getname()); } }); } //例4:http请求网络,map转化器,fastjson解析器 public void map1(){ observable.create(new observableonsubscribe<string>() { @override public void subscribe(@nonnull final observableemitter<string> e) throws exception { okhttpclient client = new okhttpclient(); request request = new request.builder() .url("https://qhb.2dyt.com/bwei/login") .build(); client.newcall(request).enqueue(new callback() { @override public void onfailure(call call, ioexception e) { } @override public void onresponse(call call, response response) throws ioexception { string result = response.body().string(); e.onnext(result); } }); } }) //map转换器 flatmap(无序),concatmap(有序) .map(new function<string, bean="">() { @override public bean apply(@nonnull string s) throws exception { //用fastjson来解析数据 return jsonobject.parseobject(s,bean.class); } }).subscribe(new consumer<bean>() { @override public void accept(bean bean) throws exception { system.out.println("bean = "+ bean.tostring() ); } }); } //常见rxjava操作符 //例 定时发送消息 public void interval(){ observable.interval(2,1, timeunit.seconds) .take(10) .subscribe(new consumer<long>() { @override public void accept(long along) throws exception { system.out.println("along = " + along); } }); } //例 zip字符串合并 public void zip(){ observable observable1 = observable.create(new observableonsubscribe<string>() { @override public void subscribe(@nonnull observableemitter<string> e) throws exception { e.onnext("1"); e.onnext("2"); e.onnext("3"); e.onnext("4"); e.oncomplete(); } }); observable observable2 = observable.create(new observableonsubscribe<string>() { @override public void subscribe(@nonnull observableemitter<string> e) throws exception { e.onnext("a"); e.onnext("b"); e.onnext("c"); e.onnext("d"); e.oncomplete(); } }); observable.zip(observable1, observable2, new bifunction<string,string,string>() { @override public string apply(@nonnull string o, @nonnull string o2) throws exception { return o + o2; } }).subscribe(new consumer<string>() { @override public void accept(string o) throws exception { system.out.println("o"+ o); } }); }
总结
以上就是本文关于rxjava功能操作符的使用方法详解的全部内容,希望对大家有所帮助。感兴趣的朋友可以继续参阅本站:javaweb应用使用限流处理大量的并发请求详解、、java线程之线程同步synchronized和volatile详解等,有什么问题可以随时留言,小编会及时回复大家的。感谢朋友们对本站的支持!