|
|
@@ -10,92 +10,83 @@ public enum Bus { |
|
|
|
} |
|
|
|
|
|
|
|
public extension Bus { |
|
|
|
enum Option { |
|
|
|
case async |
|
|
|
/// Асинхронно обрабатывать входящие события из шины. |
|
|
|
static func receiveAsync<T>( |
|
|
|
_ keys: Set<String>, |
|
|
|
_ handler: @escaping ((String, T) -> Void), |
|
|
|
_ subscriptions: inout [AnyCancellable] |
|
|
|
) { |
|
|
|
Self.events |
|
|
|
.compactMap { convertKeyValue(keys, $0) } |
|
|
|
.receive(on: DispatchQueue.main) |
|
|
|
.sink { v in handler(v.0, v.1) } |
|
|
|
.store(in: &subscriptions) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
public extension Bus { |
|
|
|
static func receive<T>( |
|
|
|
|
|
|
|
/// Синхронно обрабатывать входящие события из шины. |
|
|
|
static func receiveSync<T>( |
|
|
|
_ keys: Set<String>, |
|
|
|
_ handler: @escaping ((String, T) -> Void), |
|
|
|
opt: [Option] = [], |
|
|
|
sub subscriptions: inout [AnyCancellable] |
|
|
|
_ subscriptions: inout [AnyCancellable] |
|
|
|
) { |
|
|
|
let isAsync = opt.contains(.async) |
|
|
|
|
|
|
|
// Async. |
|
|
|
if isAsync { |
|
|
|
Self.events |
|
|
|
.compactMap { convertKeyValue(keys, $0) } |
|
|
|
.receive(on: DispatchQueue.main) |
|
|
|
.sink { v in handler(v.0, v.1) } |
|
|
|
.store(in: &subscriptions) |
|
|
|
} |
|
|
|
|
|
|
|
// Async. |
|
|
|
if !isAsync { |
|
|
|
Self.events |
|
|
|
.compactMap { convertKeyValue(keys, $0) } |
|
|
|
.sink { v in handler(v.0, v.1) } |
|
|
|
.store(in: &subscriptions) |
|
|
|
} |
|
|
|
Self.events |
|
|
|
.compactMap { convertKeyValue(keys, $0) } |
|
|
|
.sink { v in handler(v.0, v.1) } |
|
|
|
.store(in: &subscriptions) |
|
|
|
} |
|
|
|
|
|
|
|
static func send<T>( |
|
|
|
/// Асинхронно отправлять события из узла в шину. |
|
|
|
static func sendAsync<T>( |
|
|
|
_ key: String, |
|
|
|
_ node: AnyPublisher<T, Never>, |
|
|
|
opt: [Option] = [], |
|
|
|
sub subscriptions: inout [AnyCancellable] |
|
|
|
_ subscriptions: inout [AnyCancellable] |
|
|
|
) { |
|
|
|
let isAsync = opt.contains(.async) |
|
|
|
|
|
|
|
// Async. |
|
|
|
if isAsync { |
|
|
|
node |
|
|
|
.receive(on: DispatchQueue.main) |
|
|
|
.sink { v in Self.e.send((key, v)) } |
|
|
|
.store(in: &subscriptions) |
|
|
|
} |
|
|
|
|
|
|
|
// Sync. |
|
|
|
if !isAsync { |
|
|
|
node |
|
|
|
.sink { v in Self.e.send((key, v)) } |
|
|
|
.store(in: &subscriptions) |
|
|
|
} |
|
|
|
node |
|
|
|
.receive(on: DispatchQueue.main) |
|
|
|
.sink { v in Self.e.send((key, v)) } |
|
|
|
.store(in: &subscriptions) |
|
|
|
} |
|
|
|
|
|
|
|
/// Синхронно отправлять события из узла в шину. |
|
|
|
static func sendSync<T>( |
|
|
|
_ key: String, |
|
|
|
_ node: AnyPublisher<T, Never>, |
|
|
|
_ subscriptions: inout [AnyCancellable] |
|
|
|
) { |
|
|
|
node |
|
|
|
.sink { v in Self.e.send((key, v)) } |
|
|
|
.store(in: &subscriptions) |
|
|
|
} |
|
|
|
|
|
|
|
/// Синхронно отправить событие в шину один раз. |
|
|
|
static func send(_ key: String, _ value: Any) { |
|
|
|
Self.e.send((key, value)) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
public extension Bus { |
|
|
|
static func process<Src, Dst>( |
|
|
|
static func processAsync<Src, Dst>( |
|
|
|
_ keysIn: Set<String>, |
|
|
|
_ keyOut: String, |
|
|
|
_ handler: @escaping ((Src) -> Dst?), |
|
|
|
opt: [Option] = [], |
|
|
|
sub subscriptions: inout [AnyCancellable] |
|
|
|
_ subscriptions: inout [AnyCancellable] |
|
|
|
) { |
|
|
|
let isAsync = opt.contains(.async) |
|
|
|
|
|
|
|
// Async. |
|
|
|
if isAsync { |
|
|
|
Self.events |
|
|
|
.compactMap { processKeysValue($0, keysIn, handler) } |
|
|
|
.receive(on: DispatchQueue.main) |
|
|
|
.sink { vOut in Self.e.send((keyOut, vOut)) } |
|
|
|
.store(in: &subscriptions) |
|
|
|
} |
|
|
|
Self.events |
|
|
|
.compactMap { processKeysValue($0, keysIn, handler) } |
|
|
|
.receive(on: DispatchQueue.main) |
|
|
|
.sink { vOut in Self.e.send((keyOut, vOut)) } |
|
|
|
.store(in: &subscriptions) |
|
|
|
} |
|
|
|
|
|
|
|
// Sync. |
|
|
|
if !isAsync { |
|
|
|
Self.events |
|
|
|
.compactMap { processKeysValue($0, keysIn, handler) } |
|
|
|
.sink { vOut in Self.e.send((keyOut, vOut)) } |
|
|
|
.store(in: &subscriptions) |
|
|
|
} |
|
|
|
static func processSync<Src, Dst>( |
|
|
|
_ keysIn: Set<String>, |
|
|
|
_ keyOut: String, |
|
|
|
_ handler: @escaping ((Src) -> Dst?), |
|
|
|
_ subscriptions: inout [AnyCancellable] |
|
|
|
) { |
|
|
|
Self.events |
|
|
|
.compactMap { processKeysValue($0, keysIn, handler) } |
|
|
|
.sink { vOut in Self.e.send((keyOut, vOut)) } |
|
|
|
.store(in: &subscriptions) |
|
|
|
} |
|
|
|
} |