[swift] Thread와 Event

0

개요top

어떤 언어에서나 동시성 문제를 해결하는 것은 매우 까다롭습니다. 특히 병행성 문제는 동기화 문제와 함께 다양한 경우가 발생하므로 제공되는 저수준 API를 그대로 사용하면 너무 정교한 코드를 매번 짜야합니다.
이런 이유로 멀티쓰레드환경에서도 잘 알려진 디자인패턴이 존재합니다. 대표적으로 워커(Worker)패턴과 서스펜션(Suspension)패턴이 있습니다.
NSOperationQueue는 일종의 워커패턴 구현체로 인식할 수 있습니다. 특히 쓰레드풀링을 내장하고 있는 고수준의 클래스로 maxConcurrentOperationCount를 조정해주면 알아서 내부 쓰레드의 종료시점을 판단하여 큐에 있는 쓰레드의 시작 시점을 자동으로 조정합니다. 마치 자바의 Executors.newFixedThreadPool(3) 를 한 효과를 다음과 같은 코드로 이룰 수 있습니다.

let que = NSOperationQueue()
que.maxConcurrentOperationCount = 3

일단 OS레벨에서 고수준 쓰레드패턴 구현체를 제공해주므로 워커패턴은 이견없이 이것에 맡기자고 결정했습니다. 다음은 서스펜션의 문제인데, 짜피 넌블럭킹, 넌싱크를 위한 장치라면 시점 조정을 통해서 달성하는 편이 낫겠다 싶어서 각 쓰레드의 완료시점에서 호출되는 옵져버를 통해 해결하자는 결론에 도달했습니다. 옵져버의 채널을 다양화 하면 이벤트시스템이 됩니다.
이에 대한 OS구현체는 NSNotification이지만 이걸 쓰기엔 여러가지 문제가 있습니다. 일단 수신자가 정적 셀렉터로 확정되어야하는 점이 가장 큰 걸림돌입니다. 런타임에 유연한 이벤트시스템에는 적합하지 않은 거죠. 어쩔 수 없이 간단한 옵져버 구현체를 만들고 이를 통해 쓰레드 종료시점의 이벤트를 호출하는 식으로 정리해가기로 했습니다.

이벤트시스템top

일반적으로 이벤트시스템에 등장하는 등장인물은 옵져버패턴과 별다르지 않습니다만 관용적인 이름이 다릅니다.

  • 옵져버:Subject – 이벤트:Dispatcher
  • 옵져버:Observer – 이벤트:Listener
  • 옵져버:Info – 이벤트:Event
  • 옵져버:없음 – 이벤트:EventType

순수 옵져버 패턴에서는 채널링기능이 없으니 이벤트타입에 해당되는 개념은 없습니다. 하는 일은 대충 비슷합니다(개인적으로 다채널 옵져버는 이벤트라고 부르는건가 싶습니다^^)
이러한 이벤트시스템은 디스패처와 리스너 사이를 느슨하게 연결하여 런타임에 정의된 객체 간의 통신을 돕습니다. swift는 런타임에 클로저표현식을 통해 자유롭게 실행가능한 객체를 만들어낼 수 있으므로 매우 잘 어울린다고 할 수 있습니다.

Eventtop

우선 각 리스너에게 전달될 정보를 나타낼 Event클래스를 정의합니다.

class Event{

	let type:String
	let target:Any!
	let data:Any!

	var isStopPropagation:Bool = false
		
	init(_ type:String, _ target:Any?, _data:Any?){
		self.type = type
		self.target = target
		self.data = data
	}
}

우선 생성시점에 type과 target, data가 결정됩니다. target은 nil의 가능성이 있어 옵셔널입니다. 그외에 하나의 이벤트에 다수의 리스너를 등록할 수 있으므로 도중에 전파를 멈추고 싶다면 isStopPropagation 를 이용해 중지시킬 수 있게 합니다.

리스너를 클로져로 볼때의 문제점top

이제 Dispatcher에게 리스너를 등록하고 제거하는 메소드를 구현할 것인데 작은 걸림돌이 있습니다. 클로저 간의 동일성에 대한 비교를 할 방법이 없으므로 이미 있는 리스너인지 확인할 방법도 없고 중복된 리스너는 등록하지 않게 할 방법도 없습니다. 이를 위해 어쩔 수 없이 저수준 포인터를 이용하여 클로저간의 일치를 비교하는 연산자 오버라이딩을 구현해야합니다. 이 코드는 스택오버플로우로부터 검색해서 얻었습니다.
코드를 찬찬히 보니 Basic시절의 peek, poke를 보는 느낌이네요 ^^

func peek1<A,R>(f:A->R)->(fp:Int, ctx:Int) {
	typealias IntInt = (Int, Int)
	let (_, lo) = unsafeBitCast(f, IntInt.self)
	let offset = sizeof(Int) == 8 ? 16 : 12
	let ptr  = UnsafePointer<Int>(bitPattern:lo + offset)
	return (ptr.memory, ptr.successor().memory)
}
func === <A,R>(l:A->R,r:A->R)->Bool {
	let tl = peek1(l), tr = peek1(r)
	return tl.0 == tr.0 && tl.1 == tr.1
}

우선 간단히 인자를 하나만 받는 클로저에 대해 제네릭으로 구해 메모리주소를 얻어 두 클로저의 메모리가 일치하는지를 검사하는 간단한 로직입니다. 이제 ===를 통해 인자가 하나 있는 타입의 클로저는 비교할 수 있게 되었습니다.

Dispatcher 구현top

Dispatcher는 이벤트를 통지하고 리스너를 관리합니다. 하지만 상속모델로 구현하면 사용이 빡빡하므로 합성모델에 근거한다고 생각하면 본인이 디스패치의 중심이라기보다 위임받은 객체가 중심이 되는 것이 맞을 것입니다. 이외엔 다채널로 관리할 리스너 목록이 있습니다. 간단히 뮤터블딕셔너리로 선언하죠. 따라서 기본적인 타입과 생성자의 구조는 다음을 따릅니다.

typealias MDIC = NSMutableDictionary
typealias LISTENER = Event->()

class bsDispatcher{
	
	let target:Any?
	private var listeners:MDIC = MDIC()
	
	init(target:Any? = nil){
		self.target = target
	}

이제는 addListener를 구현할 차례입니다. 이미 ===를 통해 비교를 할 수 있게 되었으므로 매우 간단히 구현할 수 있습니다. 이벤트 타입에 해당되는 채널이 아직 생성되지 않았다면 만들어주고 거기에 차곡차곡 리스너를 넣어줍니다. 헌데 문제가 하나 있는데 복사문제를 피하기 위해 스위프트배열 대신 참조타입인 NSMutableArray를 쓰게 되는데 이 녀석은 요소로 참조타입만 받아주기 때문에 직접 클로저를 넣을 수 없습니다. 다음과 같은 상황인 거죠.

func addListener(type:String, _ listener:LISTENER){
	if listeners[type] == nil{
		listeners[type] = NSMutableArray()
	}
	//listeners[type]에 직접 listener를 넣을 수 없다!

따라서 참조형으로 래핑할 간단한 클래스가 필요합니다. Listener라는 클래스를 하나 정의하죠.

class Listener{
	let v:LISTENER

	init(_ v:LISTENER){
		self.v = v
	}
}

하는 일이라고는 리스너를 감싸 참조형으로 만들어줄 뿐인 간단한 형정의용 클래스입니다. 이제 이를 이용해 배열에 넣을 수 있게 되었습니다. 나머지 코드를 전개하죠.

func addListener(type:String, _ listener:LISTENER){
	if listeners[type] == nil{
		listeners[type] = NSMutableArray()
	}
	if let ls = listeners[type]{
		let arr = ls as! NSMutableArray
		//리스너가 같은지 확인한다!
		if !arr.contains({($0 as! Listener).v === listener}){
			arr.addObject(Listener(listener))
		}
	}
}

이제 간단히 removeListener도 구현할 수 있습니다.

func removeListener(type:String, _ listener:LISTENER? = nil){
	//리스너가 없음
	if listeners[type] == nil{
		return
	}
	var v = listeners[type] as! [Listener]

	//리스너를 안넘기면 해당 타입을 다 지움
	if listener == nil{
		v.removeAll()
	}else{ //아니면 찾아서 지움
		for i in 0...v.count{
			if v[i].v === listener!{
				v.removeAtIndex(i)
			}
		}
	}
}

약간의 꼼수라면 NSMutableArray는 swift배열로도 간단히 형변환해서 쓸 수 있기 때문에 [Listener]로 캐스팅하여 순환할때 형변환코드를 좀 줄였습니다.

마지막으로 이벤트를 통지하는 dispatch메소드를 구현하면 됩니다. dispatch시점에 Event에 추가적인 데이터를 전달할 수 있게 합니다.

func dispatch(type:String, data:Any? = nil){
	if let v = listeners[type]{
		let e = Event(type, target, data)
		for item in v as! [V]{
			item.v(e)
			if e.isStopPropagation{
				break;
			}
		}
	}
}

디스패치하는 채널의 배열을 꺼내 루프를 돌면서 이벤트를 전달하는 것이 끝입니다. 이제 클로저 기반의 느슨한 이벤트 연결 시스템을 얻게 되었습니다. 전체 이벤트 소스는 다음과 같습니다.

func peek1<A,R>(f:A->R)->(fp:Int, ctx:Int) {
	typealias IntInt = (Int, Int)
	let (_, lo) = unsafeBitCast(f, IntInt.self)
	let offset = sizeof(Int) == 8 ? 16 : 12
	let ptr  = UnsafePointer<Int>(bitPattern:lo + offset)
	return (ptr.memory, ptr.successor().memory)
}
func === <A,R>(l:A->R,r:A->R)->Bool {
	let tl = peek1(l), tr = peek1(r)
	return tl.0 == tr.0 && tl.1 == tr.1
}

extension SomeClass{
	typealias MDIC = NSMutableDictionary
	typealias LISTENER = Event->()

	class Event{
		let type:String
		let target:Any!
		let data:Any!
		var isStopPropagation:Bool = false
		
		init(_ type:String, _ target:Any?, _ data:Any?){
			self.type = type
			self.target = target
			self.data = data
		}
	}

	class Listener{
		let v:LISTENER

		init(_ v:LISTENER){
			self.v = v
		}
	}
}
extension SomeClass{
	class bsDispatcher{
	
		let target:Any?
		private var listeners:MDIC = MDIC()
	
		init(target:Any? = nil){
			self.target = target
		}
		func addListener(type:String, _ listener:LISTENER){
			if listeners[type] == nil{
				listeners[type] = NSMutableArray()
			}
			if let ls = listeners[type]{
				let arr = ls as! NSMutableArray
				if !arr.contains({($0 as! Listener).v === listener}){
					arr.addObject(Listener(listener))
				}
			}
		}
		func removeListener(type:String, _ listener:LISTENER? = nil){
			if listeners[type] == nil{
				return
			}
			var v = listeners[type] as! [Listener]
			if listener == nil{
				v.removeAll()
			}else{
				for i in 0...v.count{
					if v[i].v === listener!{
						v.removeAtIndex(i)
					}
				}
			}
		}
		func dispatch(type:String, data:Any? = nil){
			if let v = listeners[type]{
				let e = Event(type, target, data)
				for item in v as! [V]{
					item.v(e)
					if e.isStopPropagation{
						break;
					}
				}
			}
		}
	}
}

워커패턴 래핑top

후…이벤트를 설명하느라 진이 빠졌습니다만 원래 목적인 워커쓰레드패턴으로 드디어 돌아왔습니다.
우선 워커쓰레드를 관리할 큐를 정적 초기화를 통해 간단히 정의합니다.

static private var workerQue:NSOperationQueue = {
	let que = NSOperationQueue()
	que.maxConcurrentOperationCount = 3
	return que
}()

워낙 고수준 객체를 파운데이션이 제공하고 있어 할 일이 없습니다. 이제 정적으로 디스패처를 정의하고 쓰레드가 종료되는 시점에서 이벤트를 통지 받기 위해 간단히 이벤트타입을 하나 정의합니다.

static let THREAD_END = "THREAD_END"
static let dispatcher = Dispatch(nil)

남은건 간단한 큐의 래핑함수 뿐이죠. 쓰레드가 될 클로저형을 미리 정의할 건데 worker등록시 추가 데이터를 전달받을 수 있는 형태고 이를 준수하는 클로저를 쓰레드로 받아들입니다.

typealias THREAD = Any?->()

static func worker(data:Any?, thread:THREAD){
	workerQue.addOperationWithBlock{
		thread(data)
		dispatcher.dispatch(THREAD_END)
	}
}

모든 부속이 갖춰졌습니다. 이제 손쉽게 워커를 사용할 수 있게 되었습니다.

//완료 리스너 등록
dispatcher.addListener(THREAD_END){
	print("쓰레드완료, event:\($0.data)") //"test"출력
}
//워커발동
worker("test"){
	print("쓰레드진행, data:\($0)") //"test"출력
}

완료리스너가 호출되는 시점에 워커가 하는 일은 반드시 종결되어있으므로 병행 쓰레드임에도 동시접근을 방지하게 됩니다. 별도로 서스펜션쓰레드가 없어도 완료리스너에서 작업하는 이상 항상 안전하게 됩니다.
(물론 그 워커와 완료리스너 외에 다른 녀석이 둘 사이에 노리는 값을 건드리지 않는다는 가정이 필요하지만 그것은 서스펜션 패턴도 마찬가지입니다^^)

UI업데이트의 문제top

위에서 구현한 워커에서 mainUI를 건드리면 당연하게도 오류가 납니다. 이왕 래핑한거 메인UI용 쓰레드도 가볍게 래핑하여 함수로 제공하죠.

static private var mainQue = dispatch_get_main_queue()

static func mainUI(thread:()->Void){
	dispatch_sync(mainQue, thread)
}

간단하여 설명할게 없습니다. 이제 워커의 사용은 더욱 부드럽게 진행됩니다.

worker("test"){
	//쓰레드에서 할일
	mainUI{
		//UI업데이트
	}
}

결론top

동시성 문제와 로직간의 약결합을 위한 이벤트 시스템을 구현해보았습니다. 워커부분의 전체 소스는 아래와 같습니다.

extension SomeClass{
	typealias THREAD = Any?->()

	static let THREAD_END = "THREAD_END"
	static let dispatcher = Dispatcher(nil)
	static private let mainQue = dispatch_get_main_queue()

	static func mainUI(thread:()->Void){
		dispatch_sync(mainQue, thread)
	}

	static func worker(data:Any?, thread:THREAD){
		workerQue.addOperationWithBlock{
			thread(data)
			dispatcher.dispatch(THREAD_END)
		}
	}
}

Comments

comments