Skip to content

Instantly share code, notes, and snippets.

@jooyunghan
Last active December 17, 2018 12:28
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jooyunghan/81cd9eeed8ffbc20e02c5384239569ff to your computer and use it in GitHub Desktop.
Save jooyunghan/81cd9eeed8ffbc20e02c5384239569ff to your computer and use it in GitHub Desktop.
개미수열 10장 리액티브 스트림

10장 리액티브 스트림

이 장에서는 개미 수열을 웹으로 출력해 보자. 개미 수열의 100 번째 줄을 알고 싶으면 http://ant-seq.com/100 에서 볼 수 있게. 그러는 중에 뭔가 새로운 개념을 하나 살펴볼 것이다. 이 장에서 살펴볼 개념은 바로 리액티브 스트림이다.

노드로 시작

노드(Node)를 이용하여 개미 수열을 출력하는 서버를 만들어보자. "Hello World" 예제를 조금 바꿔서 빠르게 구현해 보자.

{title="개미 수열 서버(Node)", lang="javascript"}

const http = require("http");
const url = require("url");
const ant = require("./ant-regex");

const hostname = "127.0.0.1";
const port = 3000;

const server = http.createServer((req, res) => {
  res.statusCode = 200;
  res.setHeader("Content-Type", "text/plain");
  res.end(ant(+url.parse(req.url).path.substr(1)) + "\n");
});

server.listen(port, hostname, () => {
  console.log(`개미 수열 running at http://${hostname}:${port}/`);
});

서버를 실행한 다음 curl 등으로 잘 동작하는지 확인해 보자.

$ curl http://127.0.0.1:3000/10
11131221133112132113212221

10 같은 작은 입력으로 동작을 확인했다. 하지만 100 번째 줄을 출력하기 위해 http://127.0.0.1:3000/100 요청을 전달하면 아무런 응답이 없다. 이 예제는 2장에서 정규표현식을 이용하여 구현한 개미 수열 함수를 이용하였다. 그리고 우리는 이미 정규표현식으로 구현한 버전이 가지는 한계를 알고 있다. 한 줄 씩 완전히 계산하다가 메모리가 넘치는 문제가 그것이다.

그런데 웹으로 바꾸면서 또다른 문제가 생겼다. 100 번째 줄을 계산하지 못하는 것은 그렇다 치고 그동안 다른 요청을 전혀 처리하지 못한다는 점이다. 좀더 그럴듯한 모습의 개미 수열 서비스를 위해 문제를 해결해 보자.

코루틴 개미 수열

백만 번째 줄을 출력하기 위해 시도한 여러가지 방법 중에서 끝판왕은 코루틴 개미 수열이었다. 정규표현식 방법처럼 한 줄을 통째로 처리하지도 않고, 이터레이터/제너레이터를 이용하는 경우처럼 스택이 넘치는 문제도 피할 수 있다.

코루틴 개미 수열은 제너레이터 함수로 되어 있어서 이터레이터 인터페이스를 이용하여 해당 줄의 내용을 출력할 수 있었다. 여기서는 하나씩 값을 읽어서 res에 쓰기만 하면 된다.

{lang="javascript"}

const ant = require("./ant-coroutine");

function handleAnt(line, req, res) {
  const g = ant(line);
  // write values from g
}

이터레이터를 순회하는 쉬운 방법은 for-of를 이용하는 것이다.

{title="for-of 출력하기", lang="javascript"}

function handleAnt(line, req, res) {
  for (const value of ant(line)) {
    res.write(String(value)); // write as String
  }
  res.end();
}

첫 번째 문제를 해결한 코드를 금방 구현할 수 있었다. 그러나 여전히 10 번째 줄만 출력할 뿐 100 번째 줄을 출력하지 못한다.

이건 노드 실행 환경, 혹은 JavaScript IO의 특수성 때문이다. res.write() 함수는 실제로 클라이언트에 어떤 값을 쓰는 IO 함수가 아니다. handleAnt() 함수가 종료할 때까진 실제 IO가 발생하지 않고 버퍼에 쌓일 뿐이다. 그러다보니 기껏 코루틴 개미 수열 함수를 이용하여 백만 번째 줄을 계산할 수는 있지만 이를 다시 버퍼에 쌓다가 메모리가 넘쳐버리고 만 것이다. res가 가리키는 ServerResponse는 스트리밍을 지원하지만 쉴새없이 write() 함수를 호출하기만 해서는 ServerResponse가 실제 클라이언트에 그 값을 전달할 기회가 얻지 못한다.

값을 만들어서 전달하는데, 전달 받는 쪽이 바로 처리할 수 없는 상황이다. 이런 경우 우리가 취할 수 있는 전략은 둘 중 하나다.

  1. 전달 받은 값 버리기
  2. 처리 능력을 넘었다고 알리기

노드는 2번 전략을 사용하고 있다. res.write() 함수 설명을 보면 반환값이 boolean인데, 내부 버퍼가 넘치는 경우 false를 반환한다. 반환값을 체크하여 버퍼가 넘치는 경우에는 쓰기를 중지하고 ServerResponse에게 버퍼를 비울(실제 IO 동작으로 클라이언트에게 전달할) 기회를 제공해야 한다.

{title="출력 속도 조절", lang="javascript"}

function handleAnt(line, req, res) {
  const g = ant(line);
  resume();

  function resume() {
    while (true) {
      const { value, done } = g.next();
      if (done) break;
      if (!res.write(String(value))) {
        res.once("drain", resume);
        return;
      }
    }
    res.end();
  }
}

ServerResponse 스트림은 내부 버퍼를 비우고 나면 "drain" 이벤트를 발생시켜서 다시 데이터를 생성할 수 있게 한다. "채우고, 꽉차면 비우고, 또 채우고, ..." 이렇게 진행된다.

이제 백만 번째 줄을 출력하는 중에도 추가 요청을 처리할 수 있게 되었다.

배압

데이터를 생산하는 쪽에서 소비하는 쪽보다 더 빨리 만들어내는 경우를 "배압(backpressure)"이라고 한다. 입출력 스트리밍에서는 배압을 잘 처리하는 것이 중요하다. 배압이 발생할 때 이를 적절히 대응하지 않는다면 시스템이 망가질 수 밖에 없다.

노드의 스트림은 입력 스트림과 출력 스트림, 혹은 읽기 스트림과 쓰기 스트림이 있는데, 각각이 데이터를 생산하는 쪽과 소비하는 쪽이라고 할 수 있다. 이 두 스트림을 r.pipe(w) 함수로 연결하면, 버퍼 한계를 넘지 않도록 두 스트림의 읽기 쓰기 속도가 제어되면서 효율적으로 동작한다.

이미 개미 수열 웹 서버는 잘 동작하지만 노드 홈페이지의 충고를 충실히 따라서 개미 수열을 읽기 스트림으로 포장한 다음 pipe()로 연결해 보자.

{title="이터레이터를 읽기 스트림으로 감싸기", lang="javascript"}

const { Readable } = require("stream")

function handleAnt(line, req, res) {
  iterableToReadable(ant(line)).pipe(res);
}

function iterableToReadable(iter) {
  const g = iter[Symbol.iterator]();
  return Readable({
    read(size) {
      try {
        const { value, done } = g.next();
        if (done) this.push(null);
        else process.nextTick(() => this.push(String(value)));
      } catch (e) {
        this.emit("error", e);
      }
    }
  });
}

직접 처리하던 루프는 사라지고, 요청(read())에 따라 데이터를 전달하는 부분만 남았다. read(size)로 전달되는 size를 힌트 삼아 한번에 좀더 많은 데이터를 전달할 수도 있다.

데이터 소스에 해당하는 읽기 스트림을 구현하는데 사용한 인터페이스를 살펴보자.

  • readable.read(size): 데이터 요청. 읽기 스트림을 사용하는 측에서 호출
  • readable.push(value): 데이터 전달.
  • readable.push(null): 스트림이 종료되었음을 통지
  • readable.emit(error): 오류 발생 통지

읽기 스트림 입장에서 read()는 호출당하는 함수, 나머지 두 함수(push/emit)는 능동적으로 호출하는 함수이다.

리액티브 스트림

노드의 읽기 쓰기 스트림이 보여주는 이러한 방식(생산자와 소비자 간의 배압을 고려한 데이터 전송)을 표준화하려는 움직임이 있다. 리액티브 스트림(reactive stream)이라고 부르며 가장 기초적인 연산을 인터페이스 형태로만 정의해 놓았다.

리액티브 스트림 표준에서 정의한 인터페이스를 보면 앞서 살펴본 노드 스트림의 인터페이스와 유사하다.1

{title="리액티브 스트림 인터페이스(JVM)", lang="java"}

interface Publisher<T> {
  void subscribe(Subscriber<T>);
}

interface Subscriber<T> {
  void onSubscribe(Subscription);
  void onNext(T);
  void onError(Throwable);
  void onComplete();
}

interface Subscription {
  void request(long);
  void cancel();
}

Publisher는 값을 생산하고, Subscriber는 값을 전달받는다. 대략의 흐름은 아래와 같다.

  1. Publishersubscribe() 메쏘드로 Subscriber를 등록한다.
  2. Publisher는 등록된 SubscriberonSubscribe() 메쏘드를 불러 등록되었음을 알린다.
  3. Subscribe는 등록되면 Subscription.request(n)로 데이터 전달을 요청한다. 노드 스트림의 Readable.read(size) 요청과 비슷하다.
  4. Publisher는 데이터 요청을 받으면 SubscriberonNext(value) 메쏘드르 호출하여 값을 하나씩 전달한다. Readable.push(value)와 비슷하다.
  5. 데이터를 생산하다가 오류가 발생하면 Subscriber.onError(e)로 오류를 전달하면서 스트림을 종료한다. Readable.emit("error")와 비슷하다.
  6. 데이터가 더 없을 때는 Subscriber.onComplete()로 스트림의 끝을 알린다. Readable.push(null)과 비슷하다.
  7. SubscriberonNext(value)로 값을 전달받으터서 추가로 request(n)를 호출하여 데이터를 요청할 수 있다.
  8. Subscriber는 데이터가 더 필요없을 때 cancel()을 호출하여 데이터 생산 중단을 요청한다.

개미 수열은 이미 제너레이터(혹은 이터레이터) 형태로 준비되어 있으니 이를 Publisher 인터페이스로 포장해 보자.

{title="이터레이터를 Publisher로 감싸기", lang="javascript"}

function handleAnt(line, req, res) {
  publisherToReadable(iterableToPublisher(ant(line))).pipe(res);
}

function iterableToPublisher(iter) {
  return {
    subscribe(subscriber) {
      const g = iter[Symbol.iterator]();
      let cancelled = false;

      subscriber.onSubscribe({
        request(n) {
          process.nextTick(produce, n);
        },
        cancel() {
          cancelled = true;
        }
      });

      function produce(n) { ... }
    }
  };
}

앞서 설명한 대략의 흐름 중 Publisher 에 해당하는 내용들이 구현되었다.

  • Publishersubscribe(subscriber) 메쏘드를 가진다.
  • subscribe()로 넘어온 새로운 subscriber에 대해 onSubscribe() 메쏘드를 호출하여 구독되었음을 알린다.
  • 이때, subscriber가 데이터를 요청하거나 중지할 수 있는 수단으로서 subscription객체를 만들어서 넘긴다.
  • subscription 객체의 request(n)가 호출되면 데이터를 만들어 전달한다.2

위의 코드로 Publisher의 기본 뼈대는 만들어졌고, 데이터 전달을 맡은 produce(n) 함수는 아래처럼 구현할 수 있다.3

{title="produce(n): 이터레이터에서 읽은 값 subscriber에 넘기기", lang="javascript"}

function produce(n) {
  try {
    const { value, done } = g.next();
    if (done) {
      subscriber.onComplete();
      return;
    }
    if (!cancelled) {
      subscriber.onNext(value);
      process.nextTick(produce, n - 1);
    }
  } catch (e) {
    cancelled = true;
    subscriber.onError(e);
  }
}

노드는 Publisher를 바로 사용할 수 없으니 이번에도 읽기 스트림으로 포장해 보자. Publisher가 만들어내는 데이터를 받으려면 Subscriber를 만들어서 등록해야 한다.

{title="Publisher를 읽기 스트림으로 감싸기", lang="javascript"}

function publisherToReadable(publisher) {
  let subscription;
  return Readable({
    read(size) {
      if (!subscription) {
        const subscriber = { ... };
        publisher.subscribe(subscriber);
      } else {
        subscription.request(1);
      }
    }
  });
}

읽기 스트림이 처음 read() 요청을 받으면 그때 publishersubscriber를 등록한다. 이미 등록한 다음 발생하는 read() 요청에 대해서는 request()로 요청을 전달하기만 하면 된다. subscriberpublisher 인터페이스를 읽기 스트림 인터페이스로 변환하는 역할을 한다.

{title="SubscriberPublisher의 데이터를 읽기 스트림으로 전달", lang="javascript"}

const readable = this;
const subscriber = {
  onSubscribe(s) {
    subscription = s;
    s.request(1);
  },
  onNext(value) {
    if (readable.push(String(value))) {
      subscription.request(1);
    }
  },
  onError(e) {
    readable.emit("error", e);
  },
  onComplete() {
    readable.push(null);
  }
};

처음 등록되었을 때 데이터를 요청하고, onNext()로 데이터를 받을 때마다 이를 readable.push()로 넘기면서, 성공하면(배압이 없으면) 다음 데이터를 요청한다.4

연습. 쓰기 스트림을 Subscriber로 감싸기

Publisher를 읽기 스트림으로 감싸는 이유는 ServerResponse와 같은 쓰기 스트림에 연결하기 위해서였다. 반대로 쓰기 스트림을 Subscriber로 감싸는 것은 어떨까? 구현해 보자.

{lang="javascript"}

function handleAnt(line, req, res) {
  iterableToPublisher(ant(line)).subscribe(writableToSubscriber(res));
}

비동기 이터레이터

이터레이터는 데이터 요청에 동기적으로 값을 제공하는 인터페이스이고, Publisher는 요청에 대해 비동기적으로 값을 제공하는 인터페이스다.

  • iterator.next() → 즉시 { value, done } 반환
  • subscription.request() → 나중에 publishersubcriber.onNext(value) 호출

이런 이유로 Publisher를 비동기 이터레이터(async iterator)라고 볼 수도 있다. 노드의 읽기 스트림도 마찬가지라서 실제로 비동기 이터레이터로 사용할 수 있다.

{title="for-await-of로 비동기 이터레이터 순회하기", lang="javascript"}

async function handleAnt(line, req, res) {
  for await (let v of iterableToReadable(ant(line))) {
    res.write(v);
  }
  res.end();
}

ant() 이터레이터를 for-of로 순회하면서 res.write()로 작성했던 예제와 달리, 비동기 이터레이터 순회는 백만 번째 개미 수열도 잘 출력한다.5 for-await 루프 바디마다 ServerResponse가 꽉 찬 버퍼를 비울 수 있는 기회가 주어지기 때문이다.

정리

처음에 이터레이터가 있었다. 이터레이터를 순회/중단/재개하며 직접 쓰기 스트림에 값을 적었다. 노드는 읽기 스트림과 쓰기 스트림의 pipe() 연결을 통해 배압을 조절해 주므로, 이터레이터를 읽기 스트림으로 포장하여 쓰기 스트림에 연결할 수 있었다. 읽기 스트림과 리액티브 스트림의 유사성을 살펴본 뒤, 이터레이터와 읽기 스트림 사이에 Publisher 인터페이스를 끼워넣었다. 또 읽기 스트림은 비동기 이터레이터 인터페이스 변환도 가능하다.

  1. 이터레이터
  2. 이터레이터 - 읽기 스트림
  3. 이터레이터 - Publisher - 읽기 스트림
  4. 이터레이터 - 읽기 스트림 - 비동기 이터레이터

여러가지 모양이 있었지만 개미 수열 서비스에서 핵심이 되는 것이 바로 비동기 스트림이었다. 이중에서도 리액티브 스트림의 경우는 단순히 Publisher 인터페이스에 그치지 않고, 대부분의 구현체가 다양한 유틸리티 메쏘드들을 제공하여 스트림을 조작할 수 있게 해 준다.

아쉽지만 이 장에서는 비동기 스트림 인터페이스를 살펴본 것으로 마무리 짓는다.

Footnotes

  1. JVM용 인터페이스(https://github.com/reactive-streams/reactive-streams-jvm). 타입 인자를 간소화했다.

  2. SubscriberonNext()에서 request()를 호출하면 request()onNext()request()onNext() → .. 이렇게 콜스택이 쌓이게 된다. onNext()가 중첩 호출되지 않도록 process.nextTick() 을 사용하여 트램폴리닝으로 동작하게 한다.

  3. 재귀호출처럼 보이지만 process.nextTick()을 통해 트램폴리닝으로 동작한다.

  4. read(size)로 전달된 sizerequest(n)는 약간 다른 의미를 가진다. n은 요청 데이터 갯수이며, publisher는 요청받은 것보다 더 많이 생산해서는 안된다. size는 바이트 크기를 나타내며, 추가로 push() 할 수 있는 데이터 크기에 대한 힌트이다.

  5. 클라이언트가 먼저 연결을 끊어버리는 경우 불필요한 순회를 중지하려면 if (res.socket.destroyed) break; 같은 탈출 조건을 추가하면 된다.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment