当前位置: 首页 > news >正文

做网站来钱快/网店推广的渠道有哪些

做网站来钱快,网店推广的渠道有哪些,产品外观工业设计公司,才做的网站怎么搜不到目录什么是Rxjava1.简单实例2.操作符系列2.1 map操作符题外话题disposable什么是Rxjava ReactiveX是一个通过使用可观察序列来编写异步和基于事件的程序的库。 它扩展了观察者模式以支持数据和/或事件序列,并增加了运算符,使您可以声明性地组合序列&…

目录

  • 什么是Rxjava
  • 1.简单实例
  • 2.操作符系列
    • 2.1 map操作符
  • 题外话题disposable

什么是Rxjava

ReactiveX是一个通过使用可观察序列来编写异步和基于事件的程序的库。
它扩展了观察者模式以支持数据和/或事件序列,并增加了运算符,使您可以声明性地组合序列,同时抽象出对低级线程,同步,线程安全性,并发数据结构和非线程等事物的关注阻塞I / O

1.简单实例

以上文字的描述很精炼的说出了rxJava是干啥的,以及它的优点,下面我会从简单的使用案例中去剖析rxJava的工作原理。

Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(ObservableEmitter<Integer> emitter) throws Exception {emitter.onNext(1);emitter.onComplete();}}).subscribe(new Observer<Integer>() {@Overridepublic void onSubscribe(Disposable d) {Log.e(TAG, "debounce:onSubscribe == 订阅");}@Overridepublic void onNext(Integer integer) {Log.e(TAG, "debounce:onNext ==" + integer);}@Overridepublic void onComplete() {Log.e(TAG, "debounce:onComplete == ");}@Overridepublic void onError(Throwable e) {}});

我们如果使用过rxJava的话,肯定知道当调用了subscribe之后,那么事件就开始发射,观察者就开始消费事件了。那么我们就从subscribe()这个函数开始剖析源码。

    @SchedulerSupport(SchedulerSupport.NONE)@Overridepublic final void subscribe(Observer<? super T> observer) {ObjectHelper.requireNonNull(observer, "observer is null");try {...//省略subscribeActual(observer);} catch (NullPointerException e) { // NOPMDthrow e;} catch (Throwable e) {....//省略throw npe;}}

可以看到当我们调用了subscribe函数之后,就会调用subscribeActual方法,我们进去看看:
在这里插入图片描述
是个observable类的抽象方法,有很多实现,咱们的操作符和create创建出来的observable都实现了这个方法。咱们还是先看下我画好的类图:
在这里插入图片描述
这里就知道了,结合一开头提供的代码知道subscribeActual()函数调用了ObservableCreate的实现方法。再来看下create源码,做了什么:

    @CheckReturnValue@NonNull@SchedulerSupport(SchedulerSupport.NONE)public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {ObjectHelper.requireNonNull(source, "source is null");return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));}

创建一个ObservableCreate返回,参数是ObservableOnSubscribe对象,在进去看看ObservableCreate的实现是怎么实现的。

public final class ObservableCreate<T> extends Observable<T> {final ObservableOnSubscribe<T> source;public ObservableCreate(ObservableOnSubscribe<T> source) {this.source = source;}@Overrideprotected void subscribeActual(Observer<? super T> observer) {CreateEmitter<T> parent = new CreateEmitter<T>(observer);observer.onSubscribe(parent);...source.subscribe(parent);...}

subscribeActual()函数的流程就是,接受到观察者参数之后,将其封装成emitter对象,然后首先调用观察者的onSubscribe函数,在调用source的subscribe()函数。我们从这个实现类的构造函数可以知道source就是我们create方法传入的ObservableOnSubscribe实现类,到这里也就明白了,文章一开头的那段代码,为啥观察者的函数调用顺序是这样的:

19:43:31.725 18200/cn.com.egova.test E/MainActivity: ui当前线程:2
19:43:31.727 18200/cn.com.egova.test E/MainActivity: debounce:onSubscribe == 订阅
19:43:31.727 18200/cn.com.egova.test E/MainActivity: debounce:onNext ==1
19:43:31.727 18200/cn.com.egova.test E/MainActivity: debounce:onComplete == 

当然我们也明白了observer.onSubscribe(parent);这句话其实和Observable.subscribe()这句话是处于同一个线程的,调用链上没有线程的切换,理所应当是同一个线程。

2.操作符系列

2.1 map操作符

map操作符可以说是rxJava中最好理解的一个操作符了,先从这个入手开始剖析操作符的原理。
我们在开头那段代码中加入map操作符之后,如下:
在这里插入图片描述
简单展示,所以实现较为简单,重要还是看原理,我们点击进去看下如何实现的
在这里插入图片描述
看到ObservableMap,根据类图我们知道,它还是继承着observable类,而里面的source参数是ObservableSource(也就是我们map操作符上面的create创建出来的observable对象,它继承自ObservableSource),在点击进去看下实现:

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {final Function<? super T, ? extends U> function;public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {super(source);this.function = function;}@Overridepublic void subscribeActual(Observer<? super U> t) {source.subscribe(new MapObserver<T, U>(t, function));}

还是熟悉的味道,熟悉的配方,我们看到subscribeActual的实现是,调用source.subscribe()的方法参数封装下游的观察者MapObserver对象,按照流程我们知道source就是observable对象,subscribe方法最后调用的就是observableCreate的subscribeActual方法,这样的话也就形成。

observableCreate类内							  |ObservableMap类内		     |Observer类内
observer.onSubscribe(parent);					  | --> ObservableMap.onSubscribe| Observer.onSubscribe  /emitter.onNext(1);   | --> ObservableMap.onNext	 | --> Observer.onNext
source.subscribe(parent)-->| 					  |                              |\emitter.onComplete();| --> ObservableMap.onComplete | --> Observer.onComplete 

其实的话observableMap的onNext函数不需要去细看不影响流程,但是的话,对于理解代理模式的话还是有好处的,源码如下:

static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {final Function<? super T, ? extends U> mapper;MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {super(actual);this.mapper = mapper;}@Overridepublic void onNext(T t) {...U v;try {v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");} catch (Throwable ex) {...}downstream.onNext(v);}

其实就是把参数传递过来的T类型对象,用mapper转换下生成U类型对象,继续调用下游的onNext方法把U类型对象传递下去。

可以根据最开头的代码,梳理一下整个过程的流程图,如下:
在这里插入图片描述

结合上面的整个代码做个总结,就是我们从create函数开始创建了事件源之后,后面再接着的操作符就是事件订阅过程,操作符将返回一个observable对象,内部成员持有上游的观察者作为source,subscribe函数一旦触发,那么就会封装下游的观察者生成一个observer代理类,用于将上游onNext函数传递过来的事件源对象接受过来,然后调用操作符的操作之后转化成新的对象,再将对象传递给下游的observer代理类,也就是调用一下下游的onNext函数。map操作符是个转化操作,还要其他的好多操作符,比如切换线程,过滤器,flatMap等等操作符,以此类推都可以推导出来rxJava的操作符的一些原理东西。

那么就可以知道操作符map等等调用完成之后,是创建了新的observable被观察者对象(代理了上游),subscribe函数调用之后,是将下游的observer观察者对象包装起来(代理下游),作为参数传递给上游source对象作为subscribe方法的参数。这样那么就可以知道,操作符规定事件流转的方向,并且规定了事件流转过程中具体转换细节,订阅操作只是触发了这一过程。

上面我只用了一个map,想象一下,如果中间加了很多操作符,流程其实也是一样的,只是再上面的我所画的图中间再多加几个泳道而已。

我们再来看下createEmitter类,代码如下:

static final class CreateEmitter<T>extends AtomicReference<Disposable>implements ObservableEmitter<T>, Disposable {final Observer<? super T> observer;CreateEmitter(Observer<? super T> observer) {this.observer = observer;}@Overridepublic void onNext(T t) {if (t == null) {onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));return;}//注意这句话if (!isDisposed()) {observer.onNext(t);}}

其实我们还可以推导出来observer.onSubscribe(Disposable d)函数中的参数Disposable,就是ObservableCreate类内的静态内部类CreateEmitter,我们通常使用Disposable来管理我们的rxJava的生命周期,由此可以推断,Disposable如果调用了销毁的方法,只是将状态设置为销毁状态,isDisposed()返回为false,这样下游就收不到onNext下发的事件。

题外话题disposable

一般的操作符的disposable接口的默认实现是调用上游的dispose()方法,也就是从下游一直回溯到上游,依次调用dispose()方法,那么我们可以推测dispose()方法可以阻断未到达的操作符,比如:在事件发射函数中做了长时间网络请求,在这期间我们调用了disposable.dispose()方法的话,最终回溯到createEmitter的dispose()方法,createEmitter的这个方法是个空方法,因此不会影响方法的执行,但是下游全是DISPOSED状态,所以下游事件不会去接受和消耗,从Emitter已经截断了,比如下面这个代码:

 Log.e(TAG,"主线程ID:"+Thread.currentThread().getId());Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(ObservableEmitter<Integer> emitter) throws Exception {Log.e(TAG,"subscribe线程ID:"+Thread.currentThread().getId());emitter.onNext(1);emitter.onComplete();Log.e(TAG,"subscribe方法执行完毕");}})
//        .subscribeOn(Schedulers.io()).map(new Function<Integer, Integer>() {@Overridepublic Integer apply(Integer integer) throws Exception {Log.e(TAG,"map操作符线程ID:"+Thread.currentThread().getId());return integer;}})
//        .observeOn(Schedulers.io()).subscribe(new Observer<Integer>() {@Overridepublic void onSubscribe(Disposable d) {d.dispose();Log.e(TAG,"onSubscribe线程ID:"+Thread.currentThread().getId());}@Overridepublic void onNext(Integer integer) {Log.e(TAG,"onNext线程ID:"+Thread.currentThread().getId());}@Overridepublic void onError(Throwable e) {Log.e(TAG, "debounce:onError == " + e.getMessage());}@Overridepublic void onComplete() {Log.e(TAG,"onComplete线程ID:"+Thread.currentThread().getId());}});

只要onSubscribe一调用就dispose方法,看看打印的什么:

10:12:40.398 17802-17802/cn.com.egova.test E/MainActivity: 主线程ID:2
10:12:40.418 17802-17802/cn.com.egova.test E/MainActivity: onSubscribe线程ID:2
10:12:40.418 17802-17802/cn.com.egova.test E/MainActivity: subscribe线程ID:2
10:12:40.418 17802-17802/cn.com.egova.test E/MainActivity: subscribe方法执行完毕

假如我们将onSubscribeOn操作符加上去,来看subcribeOn操作的具体实现:

@Override
public void subscribeActual(final Observer<? super T> observer) {final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);observer.onSubscribe(parent);parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable>implements Observer<T>, Disposable {@Overridepublic void dispose() {DisposableHelper.dispose(upstream);DisposableHelper.dispose(this);}

那么知道,回溯到这个操作符的时候,调用的就是上游的方法和schedule的dispose方法。这里具体是啥我们不去追究,基本意思就是将这个schedule调度器的线程给暂停掉。这样的话,Emitter如果是个耗时操作的话,在耗时操作完成之前dispose掉,那么这个线程就会被停掉,那么耗时操作就会被打断。
代码如下:

Log.e(TAG,"主线程ID:"+Thread.currentThread().getId());Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(ObservableEmitter<Integer> emitter) throws Exception {Log.e(TAG,"subscribe线程ID:"+Thread.currentThread().getId());try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();Log.e(TAG,"subscribe打断异常");}emitter.onNext(1);emitter.onComplete();Log.e(TAG,"subscribe方法执行完毕");}}).subscribeOn(Schedulers.io()).map(new Function<Integer, Integer>() {@Overridepublic Integer apply(Integer integer) throws Exception {Log.e(TAG,"map操作符线程ID:"+Thread.currentThread().getId());return integer;}})
//        .observeOn(Schedulers.io()).subscribe(new Observer<Integer>() {@Overridepublic void onSubscribe(Disposable d) {d.dispose();Log.e(TAG,"onSubscribe线程ID:"+Thread.currentThread().getId());}@Overridepublic void onNext(Integer integer) {Log.e(TAG,"onNext线程ID:"+Thread.currentThread().getId());}@Overridepublic void onError(Throwable e) {Log.e(TAG, "debounce:onError == " + e.getMessage());}@Overridepublic void onComplete() {Log.e(TAG,"onComplete线程ID:"+Thread.currentThread().getId());}});

打印的结果如下:

10:26:42.595 19515-19515/cn.com.egova.test E/MainActivity: 主线程ID:2
10:26:42.664 19515-19515/cn.com.egova.test E/MainActivity: onSubscribe线程ID:2

我们发现直接就没有机会执行Emitter的方法了,假设我们的d.dispose()方法延迟0.5秒执行的话,

    new Thread(new Runnable() {@Overridepublic void run() {try {Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}dp.dispose();}}).start();

结果打印的是:

10:30:20.794 19978-19978/cn.com.egova.test E/MainActivity: 主线程ID:2
10:30:20.864 19978-19978/cn.com.egova.test E/MainActivity: onSubscribe线程ID:2
10:30:20.870 19978-20026/cn.com.egova.test E/MainActivity: subscribe线程ID:763
10:30:21.385 19978-20026/cn.com.egova.test E/MainActivity: subscribe打断异常
10:30:21.385 19978-20026/cn.com.egova.test E/MainActivity: subscribe方法执行完毕

结果也符合预期。
取消订阅的链式结构如下:
在这里插入图片描述

基本的原理性已经剖析完了,下一篇文章我将剖析一下线程切换的问题。

http://www.jmfq.cn/news/4999735.html

相关文章:

  • 做药的文献一般在哪些网站查找/关键词优化精灵
  • 自适应网站的图做多大 怎么切/关键字挖掘机爱站网
  • 做音乐网站的目地/中国足彩网竞彩推荐
  • 福州工厂网站建设定制服务/专业做网站设计
  • c web网站开发浏览器/广州网站制作公司
  • 投放广告网站/宁波营销型网站建设优化建站
  • 开网站做商城怎么样/哪里可以接广告
  • 长春网站建设seo/百度搜索大数据查询
  • 网站收录申请/百度搜索什么关键词排名
  • wap手机网站建设公司/沈阳网站优化
  • 做王境泽表情的网站/网店搜索引擎优化的方法
  • 个人网站需要买服务器吗/广州专门做seo的公司
  • 国内python 做的网站/东莞谷歌推广公司
  • 大丰哪家专业做网站/商业软文
  • 合肥瑶海区网站建设价格/百度平台商家我的订单查询
  • 比较好看的网站设计/百度app营销软件
  • 海南省建设网站的公司/百度一下就知道官方网站
  • 宁国市网站关键词优化外包/全网营销国际系统
  • 苏州网站排名/seo翻译
  • 做啥英文网站赚钱/旅行网站排名前十名
  • 前端网站建设插件/友链交换网站源码
  • 有没有免费的网站空间/优化疫情防控 这些措施你应该知道
  • 网站域名怎么进行实名认证/seo搜索引擎优化工资
  • 做视频网站注意什么/网络营销到底是干嘛的
  • 商丘网站制作案例/搜索引擎营销的常见方式
  • 江苏省网站备案查询/常用的网络推广的方法有哪些
  • 桂林做网站哪家好/百度推广一般要多少钱
  • 沈阳网站开发公司/网站代运营多少钱一个月
  • 亚马逊网站建设做什么/网络推广什么做
  • vb做的网站怎么调试/医院网站建设方案