HomeAboutMeBlogGuest
© 2025 Sejin Cha. All rights reserved.
Built with Next.js, deployed on Vercel
📝
남득윤 학습 저장소
/
Toby-Tv
Toby-Tv
/
🧲
Reactive Programming
🧲

Reactive Programming

 
import java.util.Observable; import java.util.Observer; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ObserverExample { static class IntegerObservable extends Observable implements Runnable{ @Override public void run() { for (int i = 1; i <= 10; i++) { setChanged(); notifyObservers(i); //push 역할 // int i = it.next(); // pull 역할 } } } @SuppressWarnings("Deprecation") public static void main(String[] args) { // Source -> (Event/Data Source) -> Observer Observer ob = new Observer() { @Override public void update(Observable o, Object arg) { System.out.println(Thread.currentThread().getName()+" arg = " + arg); } }; IntegerObservable io = new IntegerObservable(); io.addObserver(ob); // io.run(); ExecutorService es = Executors.newSingleThreadExecutor(); es.execute(io); System.out.println(Thread.currentThread().getName()+" EXIT"); es.shutdown(); } }
ObserverExample.java
Observable의 한계
  • 작업 완료 Complete 에대한 처리 부족
  • 예외 처리에 대한 아이디어 부족
 
💡
이를 확장하여 한계를 극복 → Reactive Programming !
 
 
리액티브 프로그래밍의 큰 축 - 두가지
  • RxJava
    • MS/ Netflix 사람들이 같이 만듬
    • 에릭 마이어
    • Reactive Streams - Java 9
    • Reactive Streams
      http://www.reactive-streams.org
  • Reactive Streams 의 핵심 인터페이스
 
The Reactive Streams Contract
The Reactive Streams Contract
  1. SubScriber → Publisher 나 너를 구독할래 : subscribe 호출
  1. Publisher : Subscription을 만들어 onSubscribe 호출
    1. Subscription : 구독 자체에 대한 정보/ Back Pressure을 처리 (속도차이)
  1. Subscriber → request : 요청을 할 수 도 있음 (request 메서드)
 
 
API Components
The API Consists of the following components that are required to be provied by Reactive Stream implementations:
  1. Publisher
      • A Publisher is a provider of a potentially unbounded number of sequenced elements, publishing them according to the demand received from its Subscriber(s).
      • Publisher.subscribe(Subscriber) 를 호출 하기 위해서 아래의 프로토콜 대로 메소드 호출이 이루어 져야함
      onSubscribe onNext* (onError | onComplete)?
      onSubscribe - 한번 필수 → onNext (0~N회) → (onError | onComplete) 배타적으로 0~1회 호출
  1. Subscriber
  1. Subscription
  1. Processor
 
package demo.tobyspringtv.episode_5.pubsub; import java.util.concurrent.Flow.Publisher; import java.util.concurrent.Flow.Subscriber; public class PubSubMain { // Publisher <- Observable // Subscriber <- Observer public static void main(String[] args) { /* onSubscribe onNext* (onError | onComplete)? */ Publisher<Integer> p = new CustomPublisher(); Subscriber<Integer> s = new CustomSubscriber<>(); p.subscribe(s); } }
PubSub.java
 
package demo.tobyspringtv.episode_5.pubsub; import java.util.Arrays; import java.util.Iterator; import java.util.concurrent.Flow; public class CustomPublisher implements Flow.Publisher<Integer> { Iterable<Integer> iter = Arrays.asList(1, 2, 3, 4, 5); final Iterator<Integer> it =iter.iterator(); @Override public void subscribe(Flow.Subscriber<? super Integer> subscriber) { subscriber.onSubscribe(new Flow.Subscription() { @Override public void request(long n) { try{ while(n -- > 0){ if(it.hasNext()){ subscriber.onNext(it.next()); }else{ subscriber.onComplete(); break; } } }catch (RuntimeException e){ subscriber.onError(e); } } @Override public void cancel() { } }); } }
CustomPublisher.java
package demo.tobyspringtv.episode_5.pubsub; import java.util.concurrent.Flow; import java.util.concurrent.Flow.Subscription; public class CustomSubscriber<T> implements Flow.Subscriber<T> { Subscription subscription; @Override public void onSubscribe(Subscription subscription) { System.out.println("onSubscribe"); this.subscription = subscription; subscription.request(1); } @Override public void onNext(T item) { System.out.println("onNext " + item); this.subscription.request(1); } @Override public void onError(Throwable throwable) { System.out.println("onError"); } @Override public void onComplete() { System.out.println("onComplete"); } }
CustomSubscriber.java
 
onSubscribe onNext 1 onNext 2 onNext 3 onNext 4 onNext 5 onComplete Process finished with exit code 0
실행 결과