import java.util.Observable; import java.util.Observer; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ObserverExample { static class IntegerObservable extends Observable implements Runnable{ @Override public void run() { for (int i = 1; i <= 10; i++) { setChanged(); notifyObservers(i); //push 역할 // int i = it.next(); // pull 역할 } } } @SuppressWarnings("Deprecation") public static void main(String[] args) { // Source -> (Event/Data Source) -> Observer Observer ob = new Observer() { @Override public void update(Observable o, Object arg) { System.out.println(Thread.currentThread().getName()+" arg = " + arg); } }; IntegerObservable io = new IntegerObservable(); io.addObserver(ob); // io.run(); ExecutorService es = Executors.newSingleThreadExecutor(); es.execute(io); System.out.println(Thread.currentThread().getName()+" EXIT"); es.shutdown(); } }
Observable의 한계
- 작업 완료 Complete 에대한 처리 부족
- 예외 처리에 대한 아이디어 부족
이를 확장하여 한계를 극복 → Reactive Programming !
리액티브 프로그래밍의 큰 축 - 두가지
- RxJava
- MS/ Netflix 사람들이 같이 만듬
- 에릭 마이어
- Reactive Streams - Java 9
- Reactive Streams 의 핵심 인터페이스

- SubScriber → Publisher 나 너를 구독할래 : subscribe 호출
- Publisher : Subscription을 만들어 onSubscribe 호출
- Subscription : 구독 자체에 대한 정보/ Back Pressure을 처리 (속도차이)
- Subscriber → request : 요청을 할 수 도 있음 (request 메서드)
API Components
The API Consists of the following components that are required to be provied by Reactive Stream implementations:
- Publisher
- A Publisher is a provider of a potentially unbounded number of sequenced elements, publishing them according to the demand received from its Subscriber(s).
- Publisher.subscribe(Subscriber) 를 호출 하기 위해서 아래의 프로토콜 대로 메소드 호출이 이루어 져야함
onSubscribe onNext* (onError | onComplete)?
- Subscriber
- Subscription
- Processor
package demo.tobyspringtv.episode_5.pubsub; import java.util.concurrent.Flow.Publisher; import java.util.concurrent.Flow.Subscriber; public class PubSubMain { // Publisher <- Observable // Subscriber <- Observer public static void main(String[] args) { /* onSubscribe onNext* (onError | onComplete)? */ Publisher<Integer> p = new CustomPublisher(); Subscriber<Integer> s = new CustomSubscriber<>(); p.subscribe(s); } }
package demo.tobyspringtv.episode_5.pubsub; import java.util.Arrays; import java.util.Iterator; import java.util.concurrent.Flow; public class CustomPublisher implements Flow.Publisher<Integer> { Iterable<Integer> iter = Arrays.asList(1, 2, 3, 4, 5); final Iterator<Integer> it =iter.iterator(); @Override public void subscribe(Flow.Subscriber<? super Integer> subscriber) { subscriber.onSubscribe(new Flow.Subscription() { @Override public void request(long n) { try{ while(n -- > 0){ if(it.hasNext()){ subscriber.onNext(it.next()); }else{ subscriber.onComplete(); break; } } }catch (RuntimeException e){ subscriber.onError(e); } } @Override public void cancel() { } }); } }
package demo.tobyspringtv.episode_5.pubsub; import java.util.concurrent.Flow; import java.util.concurrent.Flow.Subscription; public class CustomSubscriber<T> implements Flow.Subscriber<T> { Subscription subscription; @Override public void onSubscribe(Subscription subscription) { System.out.println("onSubscribe"); this.subscription = subscription; subscription.request(1); } @Override public void onNext(T item) { System.out.println("onNext " + item); this.subscription.request(1); } @Override public void onError(Throwable throwable) { System.out.println("onError"); } @Override public void onComplete() { System.out.println("onComplete"); } }
onSubscribe onNext 1 onNext 2 onNext 3 onNext 4 onNext 5 onComplete Process finished with exit code 0