blog

Promise "キットのソースコードを読む

Guaranteeは、タスクの結果が成功した場合にのみ適用されることを意味します。 Guaranteeの実装を分析しましょう。 簡単に言うと、Guaranteeは保留中のイベントのセットを管理し、結果...

Jan 15, 2021 · 14 min. read
シェア
  • Promiseは、タスクが成功または失敗(結果の値がエラー)の2つの結果のいずれかで実行できることを示します。
  • 保証とは、タスクの実行結果が成功のみであることを意味します。

GuaranteeはThenableプロトコルを実装し、PromiseはThenable, CatchMixinプロトコルを実装しています。

まず、ギャランティの実装を分析することから始めましょう。

Guarantee

簡単に言うと、ギャランティは保留中のイベントのセットを管理する責任があり、結果を受け取ると、その結果を使用して保留中のタスクのセットを呼び出します。

Guaranteeで処理されるイベントは、以下のように定義されたHandlersクラスによって管理されます:

final class Handlers<R> {
 var bodies: [(R) -> Void] = []
 func append(_ item: @escaping(R) -> Void) { bodies.append(item) }
}

ご覧のように、ハンドラは内部的に処理されるイベントの配列です。

保証には2つの状態があります:

  1. 未処理ステータス:処理中のタスクが含まれます。
  2. 処理済み状態:処理タスクの結果値が含まれます。

この2つの状態を表すのがシーラントで、以下のように定義されています:

enum Sealant<R> {
 case pending(Handlers<R>)
 case resolved(R)
}

PromiseKit はスレッドセーフです。Guaranteeの状態を変更したり、保留中のイベントを追加したりするときには、スレッドセーフの問題を考慮する必要があります。そこで、GuaranteeのSealantの状態を取得したり変更したりするときは、スレッドセーフである必要があります。 この問題を解決するために、Sealantの取得と設定の処理はBoxにカプセル化されます:

class Box<T> {
 func inspect() -> Sealant<T> { fatalError() } // 現在の状態を取得する
 func inspect(_: (Sealant<T>) -> Void) { fatalError() } // シーラントの取得と変更
 func seal(_: T) {} // は指定された値に設定され、処理される。
}

BoxはPromiseKitで2つのサブクラスを持っています:

  • SealedBox:結果値で初期化され、解決された状態のみが返されます。
  • EmptyBox : 初期化され、結果値はまだありません。
final class SealedBox<T>: Box<T> {
 let value: T
 init(value: T) {
 self.value = value
 }
 // SealedBox inspect()をオーバーライドしない。_: (Sealant<T>) -> Void)  
 // というのも、SealedBoxはこのメソッドを呼ぶべきではないからだ。Sealantがパディングされている場合にのみ呼び出される必要があるからだ。
 override func inspect() -> Sealant<T> {
 return .resolved(value)
 }
}
class EmptyBox<T>: Box<T> {
 private var sealant = Sealant<T>.pending(.init())
 private let barrier = DispatchQueue(label: "org.promisekit.barrier", attributes: .concurrent)
 // 結果値を設定する
 // 1. 状態を変更する .pending -> .resolved
 // 2. 保留中のタスクを処理する
 override func seal(_ value: T) {
 var handlers: Handlers<T>!
 barrier.sync(flags: .barrier) {
 guard case .pending(let _handlers) = self.sealant else {
 return // already fulfilled!
 }
 handlers = _handlers
 self.sealant = .resolved(value)
 }
 //FIXME we are resolved so should `pipe(to:)` be called at this instant, "thensは順番に呼び出される” would be invalid
 //NOTE we don上記の thisを実行しない `sync` because that could potentially deadlock
 //THOUGH since `then` etc. typically invoke after a run-loop cycle, this issue is somewhat less severe
 if let handlers = handlers {
 handlers.bodies.forEach{ $0(value) }
 }
 //TODO solution is an unfortunate third state "sealed” where then's get added
 // to a separate handler pool for that state
 // any other solution has potential races
 }
 override func inspect() -> Sealant<T> {
 var rv: Sealant<T>!
 barrier.sync {
 rv = self.sealant
 }
 return rv
 }
 override func inspect(_ body: (Sealant<T>) -> Void) {
 var sealed = false
 barrier.sync(flags: .barrier) {
 switch sealant {
 case .pending:
 // body will append to handlers, so we must stay barrier 
 body(sealant)
 case .resolved:
 sealed = true
 }
 }
 if sealed {
 // we do this outside the barrier to prevent potential deadlocks
 // it's safe because we never transition away from this state
 // 潜在的なデッドロックを避けるために、この操作をバリアの外で実行する。
 // PMKNamespacerは次のように定義されている。
 body(sealant)
 }
 }
}

pipe(to: @escaping(Result<T>) -> Void) GuaranteeはBoxインスタンスを管理します。保証の状態は、box.seal(:T) メソッドを呼び出すことで変更され、保留中のタスクが処理されます。

ギャランティの実装を簡単に説明すると、保留中のタスクを配列に格納し、結果が得られるまで待機し、保留中のタスクを実行し、保留中のタスクのリストをクリアし、得られた結果値を保存します。

コールチェーン

Guaranteeの結果値を渡すために、Guaranteeは、then、mapなど、連鎖を容易にする多くのメソッドで拡張されています。

map

以下はマップ拡張メソッドの実装です:

func map<U>(on: DispatchQueue? = conf.Q.map,
 flags: DispatchWorkItemFlags? = nil,
 _ body: @escaping(T) -> U) -> Guarantee<U> {
 let rg = Guarantee<U>(.pending)
 pipe { value in
 on.async(flags: flags) {
 rg.box.seal(body(value))
 }
 }
 return rg
}

mapでは、現在のGuaranteeに新しいタスクが追加されます。それは、値が取得されたときにボディを通して処理した後、新しく作成されたGuaranteeインスタンスrgの結果値に値を設定することです。

then

thenメソッドの実装は以下のように定義されています:

@discardableResult
func then<U>(on: DispatchQueue? = conf.Q.map, 
 flags: DispatchWorkItemFlags? = nil, 
 _ body: @escaping(T) -> Guarantee<U>) -> Guarantee<U> {
 let rg = Guarantee<U>(.pending)
 pipe { value in
 on.async(flags: flags) {
 body(value).pipe(to: rg.box.seal)
 }
 }
 return rg
}

この拡張メソッドには3つの保証があります。

  • thenメソッドの現在のギャランティ
  • 保証が作成され、then メソッドで返されます。
  • ボディクロージャーで返却される保証

3つのGuaranteeの関係は、thenメソッドを呼び出すことで、保留中のタスクを現在のGuaranteeに追加する、つまり、bodyを呼び出してGuaranteeを取得し、thenメソッドによって返されたGuaranteeを完了するタスクを追加する、というものです。

したがって、thenメソッドで返されるGuaranteeは、ボディから返されるGuaranteeが処理されるまで待つ必要があり、ボディから返されるGuaranteeは、現在のGuaranteeが処理されるまで待つ必要があります。

PromiseKitは、さまざまな処理スタイルに対応するために、さまざまなメソッドを作成します。異なるメソッド名を持つことで、アプリケーションの可読性をコードレベルで向上させることができます。

Promise

Guaranteeとは対照的に、Promiseの実行結果は成功かもしれませんし、例外が発生するかもしれません:

  • 約束には、成功と失敗という2種類の結果があります。
  • Promise は Thenable プロトコルだけでなく、例外を処理するための CatchMixin プロトコルも実装しています。

Box in Promiseのジェネリック型はResultです。 Resultは以下のように定義されています:

public enum Result<T> {
 case fulfilled(T)
 case rejected(Error)
}

CatchMixin 例外処理

CatchMixin を拡張することで、以下のように定義された catch メソッドが提供されます:

public extension CatchMixin {
 @discardableResult
 func `catch`(on: DispatchQueue? = conf.Q.return, flags: DispatchWorkItemFlags? = nil, policy: CatchPolicy = conf.catchPolicy, _ body: @escaping(Error) -> Void) -> PMKFinalizer {
 let finalizer = PMKFinalizer()
 pipe {
 switch $0 {
 case .rejected(let error):
 guard policy == .allErrors || !error.isCancelled else {
 fallthrough
 }
 on.async(flags: flags) {
 body(error)
 finalizer.pending.resolve(())
 }
 case .fulfilled:
 finalizer.pending.resolve(())
 }
 }
 return finalizer
 }
}
public class PMKFinalizer {
 let pending = Guarantee<Void>.pending()
 /// `finally` is the same as `ensure`, but it is not chainable
 public func finally(on: DispatchQueue? = conf.Q.return, flags: DispatchWorkItemFlags? = nil, _ body: @escaping () -> Void) {
 
 pending.guarantee.done(on: on, flags: flags) {
 body()
 }
 
 }
}

catch メソッドでは、body パラメータを指定してエラーを処理するハンドラを渡し、PMKFinalizer インスタンスを返します:

  1. コールチェーン終了
  2. プロミスの実行の成否に関係なく呼び出されるクロージャを渡します。

例外が発生した場合、エラーによって回復を試みることもできます:

func recover<U: Thenable>(on: DispatchQueue? = conf.Q.map, flags: DispatchWorkItemFlags? = nil, policy: CatchPolicy = conf.catchPolicy, _ body: @escaping(Error) throws -> U) -> Promise<T> where U.T == T {
 let rp = Promise<U.T>(.pending)
 pipe {
 switch $0 {
 case .fulfilled(let value):
 rp.box.seal(.fulfilled(value))
 case .rejected(let error):
 if policy == .allErrors || !error.isCancelled {
 on.async(flags: flags) {
 do {
 let rv = try body(error)
 guard rv !== rp else { throw PMKError.returnedSelf }
 rv.pipe(to: rp.box.seal)
 } catch {
 rp.box.seal(.rejected(error))
 }
 }
 } else {
 rp.box.seal(.rejected(error))
 }
 }
 }
 return rp
}

特徴的なメソッド

wait、hang -- 非同期から同期へ

PromiseKitは非同期プロシージャに対応するために使用されますが、テストを容易にするために、非同期を同期に変換して結果の値を簡単に取得できるようにしたい場合があります。以下はwaitメソッドの実装です:

/**
Blocks this thread, so-thread "シリアルで thisを呼び出してはいけない。
any part of your chain may use. Like the main thread for example.
*/
func wait() throws -> T {
 if Thread.isMainThread {
 conf.logHandler(LogEvent.waitOnMainThread)
 }
 var result = self.result
 if result == nil {
 let group = DispatchGroup()
 group.enter()
 pipe { result = $0; group.leave() }
 group.wait()
 }
 switch result! {
 case .rejected(let error):
 throw error
 case .fulfilled(let value):
 return value
 }
}

waitメソッドは、以下のように実装されたDispatchGroupによって非同期を同期に変換します:

waitメソッドに加え、以下のように定義されたグローバル・メソッドhangが用意されています:

public func hang<T>(_ promise: Promise<T>) throws -> T {
 #if os(Linux) || os(Android)
 
 #if swift(>=4.2)
 let runLoopMode: CFRunLoopMode = kCFRunLoopDefaultMode
 #else
 // isMainThread is not yet implemented on Linux.
 let runLoopModeRaw = RunLoopMode.defaultRunLoopMode.rawValue._bridgeToObjectiveC()
 let runLoopMode: CFString = unsafeBitCast(runLoopModeRaw, to: CFString.self)
 #endif
 
 #else
 guard Thread.isMainThread else {
 // hang doesn't make sense on threads that aren't the main thread.
 // use `.wait()` on those threads.
 fatalError("Only call hang() on the main thread.")
 }
 let runLoopMode: CFRunLoopMode = CFRunLoopMode.defaultMode
 #endif
 
 if promise.isPending {
 var context = CFRunLoopSourceContext()
 let runLoop = CFRunLoopGetCurrent()
 let runLoopSource = CFRunLoopSourceCreate(nil, 0, &context)
 CFRunLoopAddSource(runLoop, runLoopSource, runLoopMode)
 
 _ = promise.ensure {
 CFRunLoopStop(runLoop)
 }
 
 while promise.isPending {
 CFRunLoopRun()
 }
 CFRunLoopRemoveSource(runLoop, runLoopSource, runLoopMode)
 }
 
 switch promise.result! {
 case .rejected(let error):
 throw error
 case .fulfilled(let value):
 return value
 }
}

ハングはRunLoopの原則を使用して非同期から同期を実装します。

実は、RunLoopはこのような関数で、内部にdo-whileループを持っています。CFRunLoopRun()を呼び出すと、スレッドはタイムアウトするか手動で停止するまでループに留まり、その時点で関数が戻ります。

実現プロセスは

RunLoopについての詳細は以下の記事をご覧ください:

blog.ibireme.com/2015/05/18/...blog.devtang.com/2012/06/24/...

after 指定された時間待ちます

実行前に指定された時間だけ待つ必要がある場合もありますが、そのような場合はGuaranteeを素早く返すことができます。

/**
 after(seconds: 1.5).then {
 // 
 }
- Returns: A guarantee that resolves after the specified duration.
*/
public func after(seconds: TimeInterval) -> Guarantee<Void> {
 let (rg, seal) = Guarantee<Void>.pending()
 let when = DispatchTime.now() + seconds
#if swift(>=4.0)
 q.asyncAfter(deadline: when) { seal(()) }
#else
 q.asyncAfter(deadline: when, execute: seal)
#endif
 return rg
}
/**
 after(.seconds(2)).then {
 // 
 }
 - Returns: A guarantee that resolves after the specified duration.
*/
public func after(_ interval: DispatchTimeInterval) -> Guarantee<Void> {
 let (rg, seal) = Guarantee<Void>.pending()
 let when = DispatchTime.now() + interval
#if swift(>=4.0)
 q.asyncAfter(deadline: when) { seal(()) }
#else
 q.asyncAfter(deadline: when, execute: seal)
#endif
 return rg
}

これは、DispatchQueueのasyncAfterメソッドを通してafterメソッド内で行われます。

firstly

読みやすくするために、PromiseKitはfirstlyメソッドを提供しています:

// 使用しない場合
URLSession.shared.dataTask(url: url1).then {
 URLSession.shared.dataTask(url: url2)
}.then {
 URLSession.shared.dataTask(url: url3)
}
// まず
firstly {
 URLSession.shared.dataTask(url: url1)
}.then {
 URLSession.shared.dataTask(url: url2)
}.then {
 URLSession.shared.dataTask(url: url3)
}

firstlyは以下のように実装されています:

public func firstly<U: Thenable>(execute body: () throws -> U) -> Promise<U.T> {
 do {
 let rp = Promise<U.T>(.pending)
 try body().pipe(to: rp.box.seal)
 return rp
 } catch {
 return Promise(error: error)
 }
}
/// - See: firstly()
public func firstly<T>(execute body: () -> Guarantee<T>) -> Guarantee<T> {
 return body()
}

race

レース・メソッドは、複数の非同期プログラムの最初の実行の終了結果を取得する必要がある場合に使用できます。レース・メソッドにはいくつかのバリエーションがあるので、そのうちの1つを見てみましょう:

public func race<T>(_ guarantees: Guarantee<T>...) -> Guarantee<T> {
 let rg = Guarantee<T>(.pending)
 for guarantee in guarantees {
 guarantee.pipe(to: rg.box.seal)
 }
 return rg
}

新しい Guarantee が race に作成され、この新しく作成された Guarantee の結果を設定する保留中のプロシージャが、競合するすべての Guarantee に追加されます。Guaranteeは最初に設定された時の結果値のみを保存し、それ以降に設定された結果値は無視されるので、新しく作成されたGuaranteeは最初に実行されたGuaranteeの結果値を保存します。

when

複数の非同期ハンドラの結果を同時に取得する必要があり、すべてのハンドラが終了するまで待つ必要があることがあります。whenメソッドにはいくつかのバリエーションがあります:

private func _when<U: Thenable>(_ thenables: [U]) -> Promise<Void> {
 var countdown = thenables.count
 guard countdown > 0 else {
 return .value(Void())
 }
 let rp = Promise<Void>(.pending)
#if PMKDisableProgress || os(Linux)
 var progress: (completedUnitCount: Int, totalUnitCount: Int) = (0, 0)
#else
 let progress = Progress(totalUnitCount: Int64(thenables.count))
 progress.isCancellable = false
 progress.isPausable = false
#endif
 let barrier = DispatchQueue(label: "org.promisekit.barrier.when", attributes: .concurrent)
 for promise in thenables {
 promise.pipe { result in
 barrier.sync(flags: .barrier) {
 switch result {
 case .rejected(let error):
 if rp.isPending {
 progress.completedUnitCount = progress.totalUnitCount
 rp.box.seal(.rejected(error))
 }
 case .fulfilled:
 guard rp.isPending else { return }
 progress.completedUnitCount += 1
 countdown -= 1
 if countdown == 0 {
 rp.box.seal(.fulfilled(()))
 }
 }
 }
 }
 }
 return rp
}
public func when<U: Thenable>(fulfilled thenables: [U]) -> Promise<[U.T]> {
 return _when(thenables).map(on: nil) { thenables.map{ $0.value! } }
}

核となるメソッドは_whenで、これは新しいPromiseを生成し、待つべきPromiseが実行を終えるたびに、まだ実行を終えていないPromiseがあるかどうかをチェックし、それらがすべて実行を終えていれば、新しく生成されたPromiseの結果値をセットするように設計されています。

ヒント

リゾルバは必要に応じて警告を出力

Promise には初期化メソッドがあります:

/// - Returns: a tuple of a new pending promise and its `Resolver`.
public class func pending() -> (promise: Promise<T>, resolver: Resolver<T>) {
 return { ($0, Resolver($0.box)) }(Promise<T>(.pending))
}

リゾルバは以下のように定義されています:

/// An object for resolving promises
public final class Resolver<T> {
 let box: Box<Result<T>>
 init(_ box: Box<Result<T>>) {
 self.box = box
 }
 deinit {
 if case .pending = box.inspect() {
 conf.logHandler(.pendingPromiseDeallocated)
 }
 }
}

ResolverはBoxをレイヤーで包んでいるように見えますが、なぜそんなことをするのですか?

PromiseKit では、リリース時に Promise のステータスが未処理の場合、コンソールに警告メッセージが出力され、問題がある可能性があることをユーザに知らせます。

このデザインの主な目的は、このニーズを満たすことだと思います。

なぜサブクラスを使わないのですか?なぜこの判断をBoxクラスに置かないのですか?

システムAPIの拡張に関する注意事項

指定したスレッドで簡単にGuaranteeを作成できるようにするために、作者はDispatchQueueを拡張しました。しかし、この拡張は他のライブラリから、あるいは将来AppleのAPIから名前が変更される可能性があり、トラブルシューティングが難しいバグにつながる可能性があります。 PromiseKitのアプローチは、 メソッドに固有の型のパラメータを追加することです。 パラメータをメソッドに追加します。コードは次のようになります:

public extension DispatchQueue {
 /**
 Asynchronously executes the provided closure on a dispatch queue.
 DispatchQueue.global().async(.promise) {
 md5(input)
 }.done { md5 in
 // 
 }
 - Parameter body: The closure that resolves this promise.
 - Returns: A new `Guarantee` resolved by the result of the provided closure.
 - Note: There is no Promise/Thenable version of this due to Swift compiler ambiguity issues.
 */
 @available(macOS 10.10, iOS 2.0, tvOS 10.0, watchOS 2.0, *)
 final func async<T>(_: PMKNamespacer, group: DispatchGroup? = nil, qos: DispatchQoS = .default, flags: DispatchWorkItemFlags = [], execute body: @escaping () -> T) -> Guarantee<T> {
 let rg = Guarantee<T>(.pending)
 async(group: group, qos: qos, flags: flags) {
 rg.box.seal(body())
 }
 return rg
 }
}

ここでPMKNamespacerは以下のように定義されます:

/// used by our extensions to provide unambiguous functions with the same name as the original function
public enum PMKNamespacer {
 case promise
}
Read next