RxJava2中Observable的创建方法介绍

创建Observable

简单介绍一些Observable静态工厂方法和公有方法的用法。

create

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("one");
e.onNext("two");
int random = new Random().nextInt(10);
if (random >= 5) {
e.onComplete();
} else {
e.onError(new Throwable("random is less than 5"));
}
}
});
observable.subscribe(new DefaultObserver<String>() {
@Override
public void onNext(@NonNull String s) {
System.out.println("s = " + s);
}
@Override
public void onError(@NonNull Throwable e) {
System.out.println("CreateActivity.onError");
e.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("CreateActivity.onComplete");
}
});

这段代码在输出one和two后生成一个随机数,如果这个随机数大于等于5则调用onComplete否则调用onError。

使用create方法创建Observable时需要在subscribe方法中手动调用onNext、onComplete、onError。

onNext、onComplete、onError分别调用DefaultObserver中对应的方法。

onComplete和onError调用后Observer不再接收事件。

注意:onComplete后继续调用onError以及onError后继续调用onError会报错。

just

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Observable<String> observable = Observable.just("one");
observable.subscribe(new DefaultObserver<String>() {
@Override
public void onNext(@NonNull String s) {
System.out.println("s = " + s);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
System.out.println("CreateActivity.onComplete");
}
});

这段代码在输出one后调用onComplete方法。

使用just方法能快速创建一个与参数类型一致的Observable,非常方便。

Rxjava2重载了10个just方法,它们的区别只是参数的数目不一样。

fromArray/fromIterable/fromCallable

fromArray

示例代码:

1
2
3
4
5
6
7
8
9
String[] strings = {"one", "two", "three"};
Observable<String> observable = Observable.fromArray(strings);
observable.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println("s = " + s);
}
});

这段代码将会输出one、two、three。

使用fromArray方法将创建一个Observable实例,这个实例会按顺序释放数组中的对象。

fromIterable

示例代码:

1
2
3
4
5
6
7
8
9
10
11
List<String> list = new ArrayList<>();
list.add("one");
list.add("two");
Observable.fromIterable(list)
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println("s = " + s);
}
});

这段代码将会输出one、two。

fromIterable类似fromArray,也会创建一个Observable实例,这个实例会按顺序释放对象。

fromCallable

有这样一个Person类:

1
2
3
4
5
6
7
8
9
10
11
public class Person {
private String name = "nobody";
public void setName(String name) {
this.name = name;
}
public Observable<String> getObservableByJust() {
return Observable.just(name);
}
}

你觉得下面这段代码的运行结果是什么?

1
2
3
4
5
6
7
8
9
10
11
Person person = new Person();
Observable<String> observable = person.getObservableByJust();
person.setName("jack");
observable.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println("s = " + s);
}
});

如果你猜是jack的话,你就错了。这段代码将会输出nobody。

just方法会在创建Observable的时候就储存给定的参数,而不是在Observable被subscribe的时候再去调用参数。

如果你想要让Observable返回的值是被subscribe时的值,可以用fromCallable方法。

1
2
3
4
5
6
7
8
public Observable<String> getObservableByFromCallable() {
return Observable.fromCallable(new Callable<String>() {
@Override
public String call() throws Exception {
return name;
}
});
}

fromCallable创建的Observable会在被subscribe时调用给定的回调方法,这样我们就能顺利获得想要的输出了。

defer

defer方法类似fromCallable方法,它们创建的Observable都会在被subscribe的时候调用给的回调方法。

它们的区别在于这个回调方法返回的对象的类型不同。

使用defer方法实现fromCallable的效果:

1
2
3
4
5
6
7
8
public Observable<String> getObservableByDefer() {
return Observable.defer(new Callable<ObservableSource<? extends String>>() {
@Override
public ObservableSource<? extends String> call() throws Exception {
return Observable.just(name);
}
});
}

empty/never/error

这三个方法在测试的时候比较有用。

empty

empty方法会创建一个不释放任何item而是直接调用onComplete的Observable。

never

never方法会创建一个既不释放任何item也不释放onComplete或onError通知的Observable。

error

error方法会创建一个不释放任何item并且在被subscribe时调用onError的Observable。

interval/intervalRange

interval

示例代码:

1
2
3
4
5
6
7
8
Observable<Long> observable = Observable.interval(10, 1, TimeUnit.SECONDS);
observable.subscribe(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
System.out.println("aLong = " + aLong);
}
});

这段代码将会在等待10秒后每过1秒输出一个数字,这个数字从0开始每次递增值为1。

intervalRange

intervalRange是interval方法的扩展,多了一个计数的起始数字和计数的次数作为参数。

示例代码:

1
2
3
4
5
6
7
8
Observable<Long> observable = Observable.intervalRange(100, 10, 10, 1, TimeUnit.SECONDS);
observable.subscribe(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
System.out.println("aLong = " + aLong);
}
});

这段代码将会在等待10秒后每过1秒输出一个数字,这个数字从100开始每次递增值为1,到达109后终止。

range

示例代码:

1
2
3
4
5
6
7
8
Observable<Integer> observable = Observable.range(0, 10);
observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
System.out.println("integer = " + integer);
}
});

这段代码将会输出从0开始到9结束的一串数字。

如果对输出数值的范围有要求的话,可以使用rangeLong方法。

repeat

示例代码:

1
2
3
4
5
6
7
8
Observable<String> observable = Observable.just("one").repeat();
observable.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println("s = " + s);
}
});

这段代码会不停的输出one。

repeat方法可以接受一个数字作为重复的次数。

值得注意的是,repeat方法并不是一个静态工厂方法。

startWith

示例代码:

1
2
3
4
5
6
7
8
Observable<String> observable = Observable.just("one", "two").startWith("hello");
observable.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println("s = " + s);
}
});

这段代码将会输出hello、one、two。

startWith方法会在给定的Observable前插入一段同样类型的item。

值得注意的是,startWith方法并不是一个静态工厂方法。

timer

示例代码:

1
2
3
4
5
6
7
8
Observable<Long> observable = Observable.timer(10, TimeUnit.SECONDS);
observable.subscribe(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
System.out.println("aLong = " + aLong);
}
});

这段代码会在等待10秒后输出数字0,这个数字无法改变。

参考