欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Rxjava功能操作符的使用方法详解

程序员文章站 2024-04-01 22:37:16
rxjava功能个人感觉很好用,里面的一些操作符很方便,rxjava有:被观察者,观察者,订阅者, 被观察者通过订阅者订阅观察者,从而实现观察者监听被观察者返回的数据...

rxjava功能个人感觉很好用,里面的一些操作符很方便,rxjava有:被观察者,观察者,订阅者,

被观察者通过订阅者订阅观察者,从而实现观察者监听被观察者返回的数据

下面把rxjava常用的模型代码列出来,还有一些操作符的运用:

依赖:

compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
// because rxandroid releases are few and far between, it is recommended you also
// explicitly depend on rxjava's latest version for bug fixes and new features.
  compile 'io.reactivex.rxjava2:rxjava:2.1.5'

这个是另一种解析数据的方法,阿里巴巴旗下的,听说是解析最快的解析器。。。。

compile 'com.alibaba:fastjson:1.2.39'
import android.os.bundle;
import android.support.v7.app.appcompatactivity;
import android.view.view;
import android.widget.textview;
 
import com.alibaba.fastjson.jsonobject;
 
import java.io.ioexception;
import java.util.concurrent.timeunit;
 
import io.reactivex.backpressurestrategy;
import io.reactivex.flowable;
import io.reactivex.flowableemitter;
import io.reactivex.flowableonsubscribe;
import io.reactivex.observable;
import io.reactivex.observableemitter;
import io.reactivex.observableonsubscribe;
import io.reactivex.observer;
import io.reactivex.android.schedulers.androidschedulers;
import io.reactivex.annotations.nonnull;
import io.reactivex.disposables.disposable;
import io.reactivex.functions.bifunction;
import io.reactivex.functions.consumer;
import io.reactivex.functions.function;
import io.reactivex.schedulers.schedulers;
import okhttp3.call;
import okhttp3.callback;
import okhttp3.okhttpclient;
import okhttp3.request;
import okhttp3.response;
 
public class mainactivity extends appcompatactivity {
 
  private textview name;
 
  @override
  protected void oncreate(bundle savedinstancestate) {
    super.oncreate(savedinstancestate);
    setcontentview(r.layout.activity_main);
 
    name = (textview) findviewbyid(r.id.name);
    //用来调用下面的方法,监听。
    name.setonclicklistener(new view.onclicklistener() {
      @override
      public void onclick(view v) {
 
        interval();
      }
    });
  }
 
  //例1:observer
  public void observer() {
    //观察者
    observer<string> observer = new observer<string>() {
      @override
      public void onsubscribe(@nonnull disposable d) {
 
      }
      @override
      public void onnext(@nonnull string s) {
        //接收从被观察者中返回的数据
        system.out.println("onnext :" + s);
      }
      @override
      public void onerror(@nonnull throwable e) {
 
      }
      @override
      public void oncomplete() {
 
      }
    };
    //被观察者
    observable<string> observable = new observable<string>() {
      @override
      protected void subscribeactual(observer<!--? super string--> observer) {
        observer.onnext("11111");
        observer.onnext("22222");
        observer.oncomplete();
      }
    };
    //产生了订阅
    observable.subscribe(observer);
  }
 
  //例2:flowable
  private void flowable(){
    //被观察者
    flowable.create(new flowableonsubscribe<string>() {
      @override
      public void subscribe(@nonnull flowableemitter<string> e) throws exception {
        for (int i = 0; i < 100; i++) {
          e.onnext(i+"");
        }
      }
      //背压的策略,buffer缓冲区        观察者
      //背压一共给了五种策略
      // buffer、
      // drop、打印前128个,后面的删除
      // error、
      // latest、打印前128个和最后一个,其余删除
      // missing
      //这里的策略若不是buffer 那么,会出现著名的:missingbackpressureexception错误
    }, backpressurestrategy.buffer).subscribe(new consumer<string>() {
      @override
      public void accept(string s) throws exception {
        system.out.println("subscribe accept"+s);
        thread.sleep(1000);
      }
    });
  }
 
  //例3:线程调度器 scheduler
  public void flowable1(){
    flowable.create(new flowableonsubscribe<string>() {
      @override
      public void subscribe(@nonnull flowableemitter<string> e) throws exception {
        for (int i = 0; i < 100; i++) {
          //输出在哪个线程
          system.out.println("subscribe thread.currentthread.getname = " + thread.currentthread().getname());
          e.onnext(i+"");
        }
      }
    },backpressurestrategy.buffer)
        //被观察者一般放在子线程
        .subscribeon(schedulers.io())
        //观察者一般放在主线程
        .observeon(androidschedulers.mainthread())
        .subscribe(new consumer<string>() {
          @override
          public void accept(string s) throws exception {
            system.out.println("s"+ s);
            thread.sleep(100);
            //输出在哪个线程
            system.out.println("subscribe thread.currentthread.getname = " + thread.currentthread().getname());
          }
        });
  }
 
 
  //例4:http请求网络,map转化器,fastjson解析器
  public void map1(){
    observable.create(new observableonsubscribe<string>() {
      @override
      public void subscribe(@nonnull final observableemitter<string> e) throws exception {
        okhttpclient client = new okhttpclient();
        request request = new request.builder()
            .url("https://qhb.2dyt.com/bwei/login")
            .build();
        client.newcall(request).enqueue(new callback() {
          @override
          public void onfailure(call call, ioexception e) {
 
          }
 
          @override
          public void onresponse(call call, response response) throws ioexception {
            string result = response.body().string();
            e.onnext(result);
          }
        });
      }
    })
        //map转换器 flatmap(无序),concatmap(有序)
        .map(new function<string, bean="">() {
      @override
      public bean apply(@nonnull string s) throws exception {
        //用fastjson来解析数据
        return jsonobject.parseobject(s,bean.class);
      }
    }).subscribe(new consumer<bean>() {
      @override
      public void accept(bean bean) throws exception {
        system.out.println("bean = "+ bean.tostring() );
      }
    });
  }
 
  //常见rxjava操作符
  //例 定时发送消息
  public void interval(){
    observable.interval(2,1, timeunit.seconds)
        .take(10)
        .subscribe(new consumer<long>() {
          @override
          public void accept(long along) throws exception {
            system.out.println("along = " + along);
          }
        });
  }
 
 
  //例 zip字符串合并
  public void zip(){
    observable observable1 = observable.create(new observableonsubscribe<string>() {
      @override
      public void subscribe(@nonnull observableemitter<string> e) throws exception {
        e.onnext("1");
        e.onnext("2");
        e.onnext("3");
        e.onnext("4");
        e.oncomplete();
 
      }
    });
    observable observable2 = observable.create(new observableonsubscribe<string>() {
      @override
      public void subscribe(@nonnull observableemitter<string> e) throws exception {
        e.onnext("a");
        e.onnext("b");
        e.onnext("c");
        e.onnext("d");
        e.oncomplete();
      }
    });
 
    observable.zip(observable1, observable2, new bifunction<string,string,string>() {
      @override
      public string apply(@nonnull string o, @nonnull string o2) throws exception {
        return o + o2;
      }
    }).subscribe(new consumer<string>() {
      @override
      public void accept(string o) throws exception {
        system.out.println("o"+ o);
      }
    });
  }

总结

以上就是本文关于rxjava功能操作符的使用方法详解的全部内容,希望对大家有所帮助。感兴趣的朋友可以继续参阅本站:javaweb应用使用限流处理大量的并发请求详解、、java线程之线程同步synchronized和volatile详解等,有什么问题可以随时留言,小编会及时回复大家的。感谢朋友们对本站的支持!