[kotlin] 바닥부터 배우는 코루틴 #1

개요

코틀린의 비동기처리 모델은 코루틴입니다. 코루틴은 현대 언어들이 차용하고 있는 다양한 언어적 기능을 복합적으로 이용하여 만들어낸 개념입니다.
가장 비슷한 개념은 ES2018에 반영되어 있는 Asynchronous Iterators로 많은 언어에서 지원되는 제네레이터와 비동기를 동기처럼 사용할 수 있게 해주는 c#의 async를 동시에 결합한 것입니다. 프로그래밍 이론상으로는 CPS라고 해서 Continuation Passing Style이라 합니다. CPS는 다양한 응용 버전이 있는데 코틀린의 코루틴도 넓은 범주에서는 여기에 해당됩니다.

하지만 내부적으로 보다 복잡한 추상 개념들을 사용하고 있어 이해하기 어려울 뿐 아니라 오해하기도 쉽습니다.

본 글은 코틀린 공식 레포지토리에서 제공해주는 coroutines.md를 기준으로 보다 알기 쉬운 예시와 설명으로 진행될 예정입니다.

JS를 통한 컨티뉴에이션 개념 연습

코루틴은 사실 코틀린 진영에서는 코틀린 코루틴이라 하여 고유명사처럼 사용하고 있지만 언어론에서는 일반 명사입니다.

일반적으로 함수에 해당되는 루틴은 호출하면 반드시 반환되는 형태로 “호출 시 한 번 진입하고 반드시 한 번 반환되는” 함수인 셈입니다.
이에 비해 코루틴의 개념은 함수가 여러 번 반환될 수 있고 동시에 여러 번 진입할 수 있으며 루틴이 인자를 받아들이면서 진입되는 것처럼 여러 번 인자를 받아들일 수도 있습니다.

실제적으로는 언어에서 지정한 문을 사용해 코루틴을 작성하면 내부에서는 컨티뉴에이션(continuation)이라는 현재 호출된 함수의 상태를 기억하는 객체를 만들고 이 객체의 상태를 갱신하면서 switch문으로 분기하여 처리하는 방식이 주를 이룹니다.

  • 이 장을 비롯하여 이어지는 JS측의 구현을 살펴보는 장은 이해에 도움을 주기 위해 작성된 것으로 CPS에 대한 이해가 있으신 분들이나 JS코드가 부담스러운 분들은 바로 코틀린 제네레이터 장으로 넘어가셔도 됩니다.

예를들어 다음과 같은 자바스크립트의 제네레이터 구문을 생각해보죠.

const iterator = (function*(){
  yield 1;
  yield 2;
})();

function* 는 제네레이터를 만들어내는 특수한 문으로 이렇게 정의된 제네레이터를 호출함으로서 매번 새로운 이터레이터를 얻을 수 있습니다. 이제 다음과 같이 이터레이터를 반복시킬 수 있습니다.

iterator.next(); 
//{value:1, done:false}

iterator.next(); 
//{value:2, done:false}

iterator.next(); 
//{value:undefined, done:true}

하지만 이러한 묘기는 내부에서 어떻게 구현될 수 있을까요? 이를 알아보기 위해 바벨사이트에서 ES5로 컨버팅해봅니다.

var iterator =
/*#__PURE__*/
regeneratorRuntime.mark(function _callee() {
  return regeneratorRuntime.wrap(function _callee$(_context) { //1
    while (1) {
      switch (_context.prev = _context.next) { //2
        case 0: 
          _context.next = 2; //3
          return 1;

        case 2:
          _context.next = 4;
          return 2;

        case 4:
        case "end":
          return _context.stop();
      }
    }
  }, _callee);
})();

트랜스파일러의 출력물이라 사람이 읽기 불편하게 되어있습니디만 간단히 살펴보죠.

  1. 우선 //1의 인자를 보면 _context라는 게 들어옵니다.
  2. 이 _context가 바로 위에서 언급했던 컨티뉴에이션으로 코루틴의 현재 상태를 기억하는 객체입니다.
  3. //2 에서 컨티뉴에이션의 다음 단계를 진행시키면서 곧장 이를 이전 단계에 넣어줍니다.
  4. 실제로 이 next나 prev에 있는 것은 순차적으로 증가하는 숫자로 최초 0상태에서 next의 값을 변화시키는 것으로 다음 실행시 분기점이 달라지는 것입니다.
  5. //3은 최초 next의 상태가 0일 때 케이스에 들어와서 다음 next를 2로 변화시킵니다. 그리고 바로 return 1을 통해 반환됩니다.
  6. 다시 이 함수가 컨티뉴에이션 객체와 함께 다시 호출된다면 이번엔 case 2로 분기될 것입니다. 거기서 next는 4가 되며 2를 반환합니다.
  7. 마지막으로 end상태로 분기하며 컨티뉴에이션을 stop()을 통해 완전히 종료시킵니다.

즉 제네레이터 문법으로 간단히 작성한 코드는 내부적으로 컨티뉴에이션이라는 상태를 관리해주는 별도의 객체를 생성하고 이를 계속 인자로 보내 함수 내부의 코드를 격리한 스위치분기로 작동하는 것입니다. 만약 다음과 같이 지역변수를 사용한다면 어떻게 될까요?

const iterator = (function*(){
  let a = 3, b = 5;
  a = 10
  yield a;
  b = 10
  yield [a, b];
})();

위의 예는 처음과는 달리 지역 변수 a, b가 등장하고 순차적으로 이 상태도 변화하게 됩니다. 이 경우 지역변수인 a, b를 컨티뉴에이션의 속성으로 잡는 방법도 있고 컨티뉴에이션을 인자로 받는 함수의 자유변수로 선언하는 방법도 있습니다.
바벨의 경우는 상위 스코프의 자유변수로 잡아서 함수의 개별 호출 시에도 상태의 일관성을 보장하는 형태로 작동합니다.

regeneratorRuntime.mark(function _callee() {
  var a, b; // 여기!!!!!
  return regeneratorRuntime.wrap(function _callee$(_context) {
    while (1) {
      switch (_context.prev = _context.next) {
        case 0:
          a = 3, b = 5;
          a = 10;
          _context.next = 4;
          return a;

        case 4:
          b = 10;
          _context.next = 7;
          return [a, b];

        case 7:
        case "end":
          return _context.stop();
      }
    }
  }, _callee);
})();

이러한 실습에서 얻을 수 있는 것은 제네레이터 구문이 실제 컴파일시에 어떤 식으로 성립되는가에 대한 이해와 이 핵심에 단계별 상태를 진행해주는 컨티뉴에이션(_context)이 존재한다는 점입니다.
이제 한 단계 더 나아가 async와 제네레이터를 결합해볼 차례입니다.

JS의 async를 통한 비동기 컨티뉴에이션의 이해

코틀린도 표준 Promise 구현체가 있습니다만 JS는 Promise를 비동기제어를 위한 표준적인 프로토콜로 사용하고 있습니다.
async구문은 기본적으로 비동기에 대한 대기를 위해 내부적으로는 Promise.then을 활용합니다.

async iterators에 들어가기 앞 서 간단한 async의 내부적인 작동을 살펴보죠. 우선 일정시간 대기 후 진행되는 간단한 비동기 Promise를 만듭니다.

const delay = delay=>new Promise((resolve, _)=>setTimeout(resolve, delay));

이 함수는 인자로 delay를 밀리세컨으로 받아 타임아웃을 건 뒤 resolve해주는 간단한 setTimeout의 Promise래퍼입니다.
이제 이를 이용하여 async함수를 구현해보죠.

const test = async()=>{
  await delay(1000);
  console.log("1초 경과");
  await delay(1000);
  console.log("2초 경과");
};

test();
//1초 경과
//2초 경과

실행해보면 위와 같이 비동기적으로 await를 통해 Promise를 대기하고 resolve된 후에나 그 밑에 문장이 실행되는 것을 알 수 있습니다. 이것은 무슨 마법인가 바벨로 알아보죠. 이번엔 좀 코드가 길기 때문에 나눠서 설명합니다. 우선 async함수 본체는 다음과 같이 번역됩니다.

var test = function () {
  var _ref = _asyncToGenerator(
  regeneratorRuntime.mark(function _callee() { //1
    return regeneratorRuntime.wrap(function _callee$(_context) {
      while (1) {
        switch (_context.prev = _context.next) {
          case 0:
            _context.next = 2;
            return delay(1000);

          case 2:
            console.log("1초 경과");
            _context.next = 5;
            return delay(1000);

          case 5:
            console.log("2초 경과");

          case 6:
          case "end":
            return _context.stop();
        }
      }
    }, _callee); //2
  }));
  return function test() {
    return _ref.apply(this, arguments);
  };
}();

복잡해보이면 좀 시야를 넓게 갖고 보면 //1 에서부터 //2까지가 제네레이터 때와 완전히 동일하게 번역되었다는 것을 알 수 있습니다. 달라진 점은 이것을 감싸는 코드가 추가되었다는 점입니다. 그러한 관점에서 위의 코드를 요약하면 다음과 같을 것입니다.

var test = function () {
  var _ref = _asyncToGenerator(
  regeneratorRuntime.mark(function _callee(){
    return 기존 제네레이터 번역 함수
  }));
  return function test() {
    return _ref.apply(this, arguments);
  };
}();

즉 컨티뉴에이션을 이용한 제네레이터이긴 하지만 이를 다시 _asyncToGenerator로 감싼 함수를 만들어 이를 호출하는 형식으로 반환한 것입니다. 따라서 제네레이터와 다른 부분은 _asyncToGenerator 인 것이죠. 이 함수는 다음과 같습니다.

function _asyncToGenerator(fn) { 
  return function () { 
    var self = this, args = arguments; 
    return new Promise(function(resolve, reject){ //1
      var gen = fn.apply(self, args); 
      function _next(value) { 
        asyncGeneratorStep(gen, resolve, reject, _next, _throw, "next", value); 
      } 
      function _throw(err) { 
        asyncGeneratorStep(gen, resolve, reject, _next, _throw, "throw", err); 
      } 
      _next(undefined); 
    }); 
  }; 
}

어려운 부분은 대충 보고 핵심만 보자면 //1 에서 Promise로 감싸고 내부에서 asyncGeneratorStep함수를 통해 다음 단계를 진행하는 함수를 생성해 호출합니다. 즉

regeneratorRuntime.wrap → _asyncToGenerator → asyncGeneratorStep

의 단계로 컨티뉴에이션 콜을 호출할 책임을 위임하고 있습니다. 모든 것을 받아 처리하는 asyncGeneratorStep의 내부는 다음과 같습니다.

function asyncGeneratorStep(gen, resolve, reject, _next, _throw, key, arg) { 
  try { 
    var info = gen[key](arg); 
    var value = info.value; 
  } catch (error) { 
    reject(error); return; 
  } 
  if (info.done) { 
    resolve(value); 
  } else { 
    Promise.resolve(value).then(_next, _throw); //1
  } 
}

간단히 말해 //1에서 Promise.resolve가 _next를 진행하도록 하고 있습니다. 이 지점으로 인해 다시 컨티뉴에이션이 진행되고 다시 다음 Promise에서 then까지 대기를 타는 형태로 정리되는 것이죠.

지금까지 JS의 async함수가 디컴파일된 결과를 갖고 개념을 익혀봤는데 정리하면 다음과 같습니다.

  1. async함수 내의 await키워드를 기준으로 컨티뉴에이션 분기를 나눈 코드로 바꾼다.
  2. 컨티뉴에이션의 진행은 Promise.then 에서 일어난다.
  3. 이를 통해 코드 상에서는 동기적으로 표현되지만 실제적으로는 컨티뉴에이션 분기를 통해 비동기적인 코드 블록으로 나뉘어 실행된다.

제네레이터와 async를 결합한 async iterators

위에서 등장한 제네레이터와 async는 서로 다른 기능을 갖고 있습니다.

  1. 제네레이터는 yield를 기준으로 컨티뉴에이션이 분기되지만
  2. async는 await를 기준으로 컨티뉴에이션이 분기된다.
  3. 제네레이터는 비동기적으로 컨티뉴에이션을 진행할 수 없다.
  4. async는 내부에서 직접 컨티뉴에이션을 수행하므로 외부에서 코루틴 진출입을 할 수 없다.

즉 제네레이터는 코루틴이지만 동기밖에 안되고 async는 비동기적이지만 루틴 밖에 안된다고 할 수 있습니다.
이를 보완한 것이 바로 async iterators로 비동기적이면서 코루틴이 되도록 컨티뉴에이션 분기를 하는 것입니다.

이것도 우선 코드로 보죠.

const delay = delay=>new Promise((resolve, _)=>setTimeout(resolve, delay));

const test = async function*(){
  let i = 0;
  do{
    await delay(1000);
    yield i++;
  }while(true);
};

const a = test();

a.next().then(console.log)
//1초 경과 후 {value:0, done:false}

a.next().then(console.log)
//또 1초 경과 후 {value:1, done:false}

JS의 async는 내부적으로 Promise로 구현되므로 이전 async에서는 사용할 수 없었던 yield를 사용할 수 있게 되고 외부에서는 then을 통해 받을 수 있습니다.

이 경우의 바벨 출력은 다음과 같습니다.

var test = function () {
  var _ref = _wrapAsyncGenerator(
  regeneratorRuntime.mark(function _callee() {
    var i;
    return regeneratorRuntime.wrap(function _callee$(_context) {
      while (1) {
        switch (_context.prev = _context.next) {
          case 0:
            i = 0;
          case 1:
            _context.next = 3;
            return _awaitAsyncGenerator(delay(1000));
          case 3:
            _context.next = 5; 
            return i++;
          case 5:
            if (true) { //1
              _context.next = 1; //2
              break;
            }
          case 6:
          case "end":
            return _context.stop();
        }
      }
    }, _callee);
  }));
  return function test() {
    return _ref.apply(this, arguments);
  };
}();

//1이 while의 조건이 되고 //2를 통해 컨티뉴에이션의 상태를 다시 1로 되돌려 계속 루프가 일어나게 만듭니다.

결국 원래 제네레이터와 async함수의 컴파일된 결과는 크게 다르지 않았으므로 외부에서 next를 호출할 수 있도록 컨티뉴에이션을 약간만 손보면 되는 일입니다.

  • 지금까지 설명한 모든 자바스크립트 코드는 바벨 온라인 사이트에서 직접 카피해 넣어서 컴파일된 코드를 보실 수 있습니다.

async Iterator로부터 다음과 같은 내용을 요약해볼 수 있습니다.

  1. 컨티뉴이에션은 yield를 통해 코루틴으로 작동할 수 있으며 진출입시 그 형태는 정해진 값 뿐만 아니라 Promise등의 반제어 비동기 객체로 반환할 수도 있다.
  2. 컨티뉴이에션이 내부에서 next로 진행될 때 동기적인 실행 뿐만 아니라 비동기적인 실행기를 이용할 수도 있다.

하지만 자바스크립트의 특성 상 좀 더 단순화 되어있는 부분도 있습니다.

  1. 비동기 처리기를 비롯하여 비동기 객체는 Promise만 사용하도록 강제된다.
  2. 싱글 쓰레드 기반이므로 뮤텍스처리를 하지 않는다.
  3. 기본적인 처리는 메인쓰레드의 이벤트 루프에서 처리되며, 통신 등의 별도 쓰레드를 쓰는 경우에도 내부에 내장된 동작으로 진행될 뿐 유저가 쓰레드 정책에 관여하지 않는다.

이제 본격적으로 코틀린 코루틴으로 들어가보죠.

싱글 쓰레드 멀티태스킹과 컨티뉴에이션 인터셉터

싱글 쓰레드를 사용하는 경우에는 동시성이나 공유된 변수 상태등을 신경쓸 필요가 없기 때문에 상대적으로 구현하기 쉽습니다. JS나 파이썬 등 많은 언어에서는 쓰레드모델을 지원하지는 않지만 이 방식을 통해 유사 쓰레드를 제공하죠. 예를 들어 자바스크립트의 setTimeout이나 setInterval을 비롯하여 fetch와 같은 통신 모듈까지도 최종적인 결과를 언제나 메인쓰레드에서 처리하도록 강제합니다(따라서 동시성 문제가 발생하지 않습니다)

하지만 코루틴에서 관심사는 컨티뉴에이션의 진행처리입니다. 즉 컨티뉴에이션이 다음 단계로 진행시켜 실행하는 대상 쓰레드가 무엇인가를 알아야만 멀티쓰레드를 지원하는 코루틴 입장에서 동시성에 대한 지원 여부를 결정할 수 있을 것입니다.

코틀린 설계자는 이러한 고민의 결과로 한 단계 더 나아가 각 컨티뉴에이션을 전진시켜 실행하는 개념을 인터페이스로 정의했습니다. 하나의 CPS 진행은 결국 컨티뉴에이션의 여러 단계로 구분되고 각 단계의 컨티뉴에이션이 중지될 때마다 이 이벤트를 가로채 적절한 시점에 재개시는 일종의 인터셉터라는 개념이죠.

이를 ContiuationInterceptor라고 합니다.

다음 코드는 싱글쓰레드에서 컨티뉴에이션 인터셉터가 작동하도록 JVM기반으로 구현된 코드예제로 공식 저장소에서 찾을 수 있습니다(아래 코드에서 newSingleThreadContext를 호출하게 되면 실질적으로 하나의 쓰레드에서 풀링을 처리하므로 단일 쓰레드 멀티태스킹이 되는 셈입니다)

package context

import java.util.concurrent.*
import java.util.concurrent.atomic.*
import kotlin.concurrent.*
import kotlin.coroutines.*

fun newFixedThreadPoolContext(nThreads: Int, name: String) = ThreadContext(nThreads, name)
fun newSingleThreadContext(name: String) = ThreadContext(1, name)

class ThreadContext(
    nThreads: Int,
    name: String
) : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
    val threadNo = AtomicInteger()
    
    val executor: ScheduledExecutorService = Executors.newScheduledThreadPool(nThreads) { target ->
        thread(start = false, isDaemon = true, name = name + "-" + threadNo.incrementAndGet()) {
            target.run()
        }
    }

    override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
        ThreadContinuation(continuation.context.fold(continuation) { cont, element ->
            if (element != this@ThreadContext && element is ContinuationInterceptor)
                element.interceptContinuation(cont) else cont
        })

    private inner class ThreadContinuation<T>(val cont: Continuation<T>) : Continuation<T>{
        override val context: CoroutineContext = cont.context

        override fun resumeWith(result: Result<T>) {
            executor.execute { cont.resumeWith(result) }
        }
    }
}

위에 등장하는 ThreadContext클래스를 살펴보겠습니다. 우선 이 클래스는

  1. AbstractCoroutineContextElement를 상속하고
  2. ContinuationInterceptor를 구상합니다.

우선 추상클래스의 경우는 코루틴 컨텍스트 엘리먼트로 작동하게 합니다. 코루틴 컨텍스트는 나중에 익히게 될 개념인데 코루틴 상의 컨티뉴에이션이 공유할 상태나 객체를 의미합니다. 컨티뉴에이션은 당연히 인터셉터를 알아야하므로 모든 인터셉터의 구상클래스는 코루틴 컨텍스트 엘리먼트를 상속하지 않으면 실제로 사용할 수 없습니다.

실제 인터셉터에서 중요한 추상 메소드는 interceptContinuation으로 각 컨티뉴에이션이 해당되는 단계를 실행후 정지될 때 감지하여 호출됩니다.

천천히 위에서 아래로 구현되어있는 코드를 보면

val threadNo = AtomicInteger()
val executor: ScheduledExecutorService

이 두 개의 변수는 인자로 받아온 갯수만큼을 쓰레드 풀로 만들고 각 풀링용 쓰레드에게 고유번호를 부여하기 위해 AtomicInteger()를 사용합니다.

override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
  ThreadContinuation(continuation.context.fold(continuation) { cont, element ->
     if (element != this@ThreadContext && element is ContinuationInterceptor)
        element.interceptContinuation(cont) else cont
  })

이제 중요한 인터셉터용 메소드를 살펴보죠. (당연하게도!) 컨티뉴에이션이 인자로 들어오며 이를 폴딩하고 있습니다.
폴딩의 내용을 차근차근 보면

  1. 최초 acc로 컨티뉴에이션을 설정했고
  2. 받아온 컨티뉴에이션이 갖고 있는 모든 컨텍스트를 순회하면 처리하고 있습니다.
  3. 컨텍스트는 일종의 엘리먼트 컬렉션이므로 각 엘리먼트를 돌면서 확인하는 절차를 밟고 있죠(컨텍스트는 나중에 =.=)
  4. 중요한 건 각 엘리먼트가 ContinuationInterceptor인 경우에는 interceptContinuation()를 호출해 반환하고
  5. 그 외의 일반적인 엘리먼트의 경우는 컨티뉴에이션을 그대로 반환합니다.

코드의 의미는 그렇지만 실제로는 어떤 의미일까요?

앞 서 설명드린대로 인터셉터가 실제로 코루틴에 반영되려면 반드시 컨텍스트 엘리먼트가 되어야합니다. 컨티뉴에이션은 실행 시의 컨텍스트를 알고 있으므로 이 안의 엘리먼트 중 다른 인터셉터가 있다면 지금 만들고 있는 ThreadContext로 감싸버릴 생각인 것입니다. 이렇게 정리된 컨티뉴에이션은 그냥 컨티뉴에이션이 아니라 이너클래스로 정의된 ThreadContinuation 이 됩니다.

private inner class ThreadContinuation<T>(val cont: Continuation<T>) : Continuation<T>{
  override val context: CoroutineContext = cont.context

  override fun resumeWith(result: Result<T>) {
    executor.execute { cont.resumeWith(result) }
  }
}

컨티뉴에이션은 상속시에 반드시 context와 resumeWith를 구현해야 합니다.

  1. 일반적으로 컨텍스트는 인자로 받은 직전 컨티뉴에이션의 컨텍스트를 그대로 사용합니다.
  2. resumeWith메소드는 실제 컨티뉴에이션이 재개될 때 호출되는 메소드로 내부에서는 컨티뉴에이션의 resumeWith를 호출해줍니다.

이 일반적인 컨티뉴에이션 구현에서 ThreadContinuation의 핵심은 executor.execute를 통해 컨티뉴에이션의 resumeWith를 호출한다는 점입니다. 결국 일관성있는 쓰레드풀링 실행기에서 컨티뉴에이션의 resumeWith를 호출해주는 것이죠.

이제 각 컨티뉴에이션을 인터셉터로 어떻게 실행할지(resumeWith)를 알게 되었으므로 실제 컨티뉴에이션을 구성할 suspend블록을 만들어볼 차례입니다.

suspendCoroutine을 이용하여 CompletableFuture감싸기

자바스크립트의 async나 제네레이터는 정해진 문법으로 작성하면 내부적으로 알아서 컨티뉴에이션 섹션을 구분한 코드로 컴파일됩니다. 예를 들어 async는 await문을 기준으로, 제네레이터는 yield를 기준으로 컨티뉴에이션 상태 분기를 해주죠.
하지만 코틀린 코루틴은 특정 언어나 플랫폼에 종속성을 갖지 않도록 추상적인 레벨에서 정의되있기 때문에 이러한 문을 도입해서 내부적으로 특정 객체를 만들거나 분리할 수 없습니다.

대신 어떤 플랫폼에서도 특정 부분이 정지점이라는 것을 알 수 있는 키워드인 suspend와 이 suspend가 컨티뉴에이션에 참여해서 행동하는 것을 규정하는 suspendCoroutine함수를 제공합니다.
실제 suspendCoroutine에 전달할 람다에 인자로 들어오는 컨티뉴에이션의 resume은 플랫폼 구현체에 따라 다른 타이밍에 호출하게 될 것입니다.

예를들어 자바스크립트라면 다음과 같이 suspend함수를 구현해볼 수 있습니다.

suspend fun delay(ms:Int) = suspendCoroutine{cont->
  window.setTimeout({cont.resume(it)}, ms)
}

이 suspend함수는 setTimeout으로 인자로 받은 ms만큼의 대기를 한 뒤 컨티뉴에이션의 resume을 호출해 그 이후를 진행하도록 합니다. 같은 개념으로 자바의 CompletableFuture를 이용한다면 다음과 같이 작성할 수 있을 것입니다.

fun <T> future(context:CoroutineContext, block:suspend()->T): CompletableFuture<T> =
        CompletableFutureCoroutine<T>(context).also{block.startCoroutine(completion = it)}

class CompletableFutureCoroutine<T>(override val context: CoroutineContext) : CompletableFuture<T>(), Continuation<T> {
    override fun resumeWith(result: Result<T>) {
        result
            .onSuccess { complete(it) }
            .onFailure { completeExceptionally(it) }
    }
}

우선 아래쪽에 구현된 CompletableFutureCoroutine를 먼저보면 직접 Continuation을 구상하여 resumeWith함수를 작성했습니다. resumeWith가 외부에서 불릴 때 result가 성공값으로 들어오면 Future를 complete로 보내고 실패로 오면 예외로 보내는 코드로 되어있습니다. 이 스타일은 컨티뉴에이션을 직접 인자로 받지 않기 때문에 result를 기반으로 반응하게 하는 스타일입니다. 이에 비해 앞의 ThreadContinuation에서는 생성자에서 받은 컨티뉴에이션에게 직접 resumeWith하도록 지시하는 스타일이었죠.

그렇기 때문에 future함수는 별도의 컨티뉴에이션을 전달받지 않고 즉시 CompletableFuture를 반환하는 함수가 될 수 있습니다. 즉 suspendCoroutine{cont->…} 함수가 필요없는 거죠. 컨티뉴에이션 구현시 이 두 가지 스타일이 다 사용되므로 눈여겨 볼 필요가 있습니다. 함수의 실질적인 역할은 CompletableFuture내에서 block.startCoroutine(completion = it)을 시키는 부분입니다.
모든 suspend 블록은 이 확장함수를 사용할 수 있는데 결국 하나의 컨티뉴에이션을 실행시키고 그게 끝나면 보고할 다음 컨티뉴에이션을 인자로 보내는 것입니다. 실제 이 확장 함수의 내부는 다음과 같습니다.

public fun <T> (suspend () -> T).startCoroutine(
    completion: Continuation<T>
) {
    createCoroutineUnintercepted(completion).intercepted().resume(Unit)
}

있는 그대로 해석하자면 현재 컨티뉴에이션에서 다음 타자 컨티뉴에이션을 지정한 컨티뉴에이션을 생성합니다. 이로서 컨티뉴에이션의 체인을 만든 셈이죠. this->complete 로 컨티뉴에이션 체인을 만들 때 인터셉터를 배제한 순수한 상태의 컨티뉴에이션을 만들게 합니다. 그리고 여기에 인터셉터를 다시 걸어주고 최종적으로 컨티뉴에이션을 재개하는 거죠.
한 마디로 startCoroutine() 확장함수는 현재의 컨티뉴에이션을 발동하여 다음 컨티뉴에이션으로 연결해주는 함수입니다.

그 결과 suspend블록 내부의 내용이 실행되고 그 안에서 컨티뉴에이션 resume이 발생하면 complete에 지정되어있던 다음 컨티뉴에이션이 실행될 것입니다.

이제 실험에 사용할 suspend 블록(커스텀 컨티뉴에이션)을 만들었으니 await를 구현할 차례입니다.

await 구현

우선 await가 왜 필요한가를 이해할 필요가 있습니다.
컨티뉴에이션이 담고 있는 블록은 각자의 사정으로 움직이고 있습니다. 코틀린 코루틴은 JS처럼 구문을 해석해서 임의의 컨티뉴에이션 섹션을 자동으로 만들어주는 방식이 아니라 사용자가 직접적으로 컨티뉴에이션 객체를 생성하는 형태이므로 코드 속에 나오는 문은 그저 평범하게 순차처리되는 문일 뿐입니다.
따라서 그 평범한 문이 중지점을 갖으려면 기존 실행 중인 컨티뉴에이션이 완료되는 것을 대기한 뒤에 진행할 수 있는 특수한 형태가 필요합니다.

예를 들어 자바의 CompletableFuture를 생각해보죠. then, thenApplayAsync등 본인 안에서 플로우가 진행되는 것은 괜찮죠. 하지만 이 퓨쳐와 대화해야하는 외부 코드에서는 퓨처의 비동기적인 플로우에 관여할 수 없으므로 보조를 맞출 수 없습니다.
방법은 또 다른 퓨처를 만들어 남은 외부 코드를 감싸 퓨처로 결합하던가 아니면 get()메소드를 통해 퓨처의 종료를 대기하는 수 밖에 없을 것입니다.

이와 동일한 일이 suspend함수 내부에서도 일어납니다. suspend함수는 컨티뉴에이션을 처리하는 능력은 있지만 그렇다고 그 안에 코드들을 강제로 재작성하여 임의의 컨티뉴에이션을 만들어내는 기능은 없습니다.
그저 퓨쳐처럼 또다른 suspend함수들을 호출할 때 현재 컨티뉴에이션을 전달해주는 정도입니다.
따라서 퓨처의 get()처럼 명시적으로 각 컨티뉴에이션의 종결을 기다릴 방법이 필요합니다.
하지만 이를 직접 제공해주는 마법같은 기능은 없으므로 각 사정에 맞춰 한 번 더 suspend로 감싸서 컨티뉴에이션의 진행을 통제하는 수 밖에 없습니다.

suspend fun <T> CompletableFuture<T>.await(): T =
    suspendCoroutine<T> { cont: Continuation<T> ->
        whenComplete { result, exception ->
            if (exception == null) // the future has been completed normally
                cont.resume(result)
            else // the future has completed with an exception
                cont.resumeWithException(exception)
        }
    }

이 await확장함수는 CompletableFuture.whenComplete 시점에 외부의 컨티뉴에이션이 진행되도록 강제합니다.
이제 suspend블록 내에서 CompletableFuture를 사용할 때는 get이 아니라 await로 외부의 컨티뉴에이션을 진행시킬 수 있을 것입니다.

컨텍스트 하의 코루틴 실행

여기까지의 학습을 충실히 따라오셨다면 결국 코루틴은

  1. 컨티뉴에이션을 링크드 리스트 처럼 연결해둔 것이고,
  2. 각 컨티뉴에이션은 suspend 블록으로 생성되며
  3. 개별 컨티뉴에이션은 반드시 다음 컨티뉴에이션을 알고 있으며
  4. 컨티뉴에이션이 하는 최종적인 일은 다음 컨티뉴에이션의 resume을 호출하는 것

이라는 사실을 알 수 있습니다. 즉 코루틴이라고 말하고 있지만 이건 전체적인 솔루션의 이름이나 마찬가지로 실질적으로 라이브러리에서 사용되는 Coroutine이라는 이름이 들어있는 대부분의 클래스들은 컨티뉴에이션의 래퍼인 셈입니다.

또 한 가지 알 수 있는 것은 이렇게 컨티뉴에이션을 정성껏 준비한 뒤 실행하는 것입니다. 모든 컨티뉴에이션은 반드시 컨텍스트를 물고 실행됩니다. 이 컨텍스트에는 컨티뉴에이션에 공유해야 할 여러 상태를 Element라는 객체에 감싸서 불변객체로 넣어둘 수 있습니다. 기본적으로 컨텍스트는 엘리먼트의 컬렉션인 셈입니다.
하지만 컨티뉴에이션들 사이에 공유된다는 점에서 이번 컨티뉴에이션 체인에 참가하는 모든 컨티뉴에이션에게 공통적으로 처리해야할 무언가는 죄다 Element로 만들어서 컨텍스트에 넣는 전략을 쓰고 있습니다.
이러한 응용 중 표준으로 제공되는 것인 인터셉터입니다.
컨텍스트에 들어온 인터셉터가 있다면 이 인터셉터가 컨티뉴에이션의 실행 전에 미리 resume타이밍을 가로채 뭔가를 할 수 있습니다. 마치 리액트의 미들웨어와 비슷한 원리입니다.

앞에서는 ThreadContext를 만들어보면서 학습했던 내용입니다.

그럼 이제 모든 재료를 합쳐 실제 작동하는 컨티뉴에이션을 만들 수 있습니다.

fun main(args: Array<String>) {
    log("시작")
    val context = newSingleThreadContext("MyEventThread")
    val f = future(context) {
        log("Hello, world!")
        val f1 = future(context) {
            log("f1 is sleeping")
            delay(1000) // sleep 1s
            log("f1 returns 1")
            1
        }
        val f2 = future(context) {
            log("f2 is sleeping")
            delay(1000) // sleep 1s
            log("f2 returns 2")
            2
        }
        log("I'll wait for both f1 and f2. It should take just a second!")
        val sum = f1.await() + f2.await()
        log("And the sum is $sum")
    }
    f.get()
    log("종료")
}

우선 newSingleThreadContext(“MyEventThread”) 를 통해 싱글쓰레드 기반의 인터셉터를 만들었습니다.
이제 이것을 컨텍스트로 future함수를 실행합니다. future함수는 CompletableFuture를 반환하므로 이후 f.get()을 통해 완료를 대기시킬 수 있습니다. 이걸 대기시키지 않으면 메인쓰레드가 즉시 종료되기 때문에 필수입니다.

future에 전달된 블록의 내용을 찬찬히 보죠.
우선 f1, f2를 다시 future함수를 통해 만들어냅니다.
그러면 f1, f2는 CompletableFuture이자 컨티뉴에이션입니다. 또한 future함수는 즉시 startCoroutine()을 수행하므로 이미 각 블록은 실행을 시작합니다.
이는 일반적인 CompletableFuture를 두 개 쓸 때와 크게 다르지 않은 상황입니다.
이제 sum에서 결과를 취합하기 위해 바로 직전에 정의했던 await()확장 함수를 쓰고 있습니다.
f1, f2는 컨티뉴에이션이므로 await에서 resume을 통해 전개되어 값을 반환할 때까지 코루틴대기를 타게 되는 것입니다.
개념 상으로는 sum = f1.get() + f2.get()과 비슷합니다.
하지만 코틀린 코루틴은 컨티뉴에이션을 실행하고 resume이 호출될 때까지 차단적으로 블록킹하지 않고 감시 이벤트큐로 보내고 해당 쓰레드는 또 다른 컨티뉴에이션의 실행에 사용하게 됩니다. 이에 비해 get()은 현재 쓰레드를 블록킹하면서 대기하게 되므로 적어도 현재 쓰레드는 f1, f2가 종료될 때까지 블록킹됩니다. 바로 이 점이 코틀린 코루틴이 효율적인 부분인거죠.

결론

이번 시간에는 코틀린의 기초 개념을 이루는 컨티뉴에이션, 컨텍스트와 엘리먼트, 인터셉터의 개념과 구현 예제를 살펴보았습니다. 다음 시간에는 보다 본격적인 제네레이터응용, 컨텍스트 시스템, 서스펜션의 제약 같은 주제를 다루도록 하겠습니다.

%d 블로거가 이것을 좋아합니다: