This commit is contained in:
Михаил Капелько
2023-12-31 19:58:55 +03:00
parent 576f67e882
commit b67ed0c593
6 changed files with 70 additions and 84 deletions

View File

@@ -0,0 +1,31 @@
import Combine
public extension Bus {
final class Async<Src, Dst> {
let v = PassthroughSubject<Src, Never>()
var subscriptions = [AnyCancellable]()
public init(
_ handler: @escaping ((Src) -> Dst?),
_ src: String,
_ dst: String
) {
// Вход.
Bus.receiveSync(
[src],
{ [weak self] _, v in self?.v.send(v) },
&subscriptions
)
// Выход.
Bus.sendSync(
dst,
v
.compactMap { (v: Src) in handler(v) }
// Асинхронно.
.receive(on: DispatchQueue.main)
.eraseToAnyPublisher(),
&subscriptions
)
}
}
}

View File

@@ -1,5 +1,5 @@
public extension Bus {
/// Пропускаем далее предоставленный ключ.
/// Пропускаем далее единственный ключ.
static func convertKeyValue<T>(
_ key: String,
_ v: (key: String, value: Any)
@@ -13,7 +13,7 @@ public extension Bus {
return (key, value)
}
/// Пропускаем далее предоставленные ключи.
/// Пропускаем далее множество ключей.
static func convertKeyValue<T>(
_ keys: Set<String>,
_ v: (key: String, value: Any)
@@ -26,19 +26,4 @@ public extension Bus {
}
return (v.key, value)
}
/// Обрабатываем.
static func processKeysValue<Src, Dst>(
_ v: (key: String, value: Any),
_ keysIn: Set<String>,
_ handler: @escaping ((Src) -> Dst?)
) -> Dst? {
guard
keysIn.contains(v.key),
let vIn = v.value as? Src
else {
return nil
}
return handler(vIn)
}
}

View File

@@ -1,23 +0,0 @@
import Combine
public extension Bus {
final class Processor<Src, Dst> {
var subscriptions = [AnyCancellable]()
public init(
_ handler: @escaping ((Src) -> Dst?),
_ keyIn: String,
_ keyOut: String
) {
Bus.processSync([keyIn], keyOut, handler, &subscriptions)
}
public init(
_ handler: @escaping ((Src) -> Dst?),
_ keysIn: Set<String>,
_ keyOut: String
) {
Bus.processSync(keysIn, keyOut, handler, &subscriptions)
}
}
}

View File

@@ -0,0 +1,29 @@
import Combine
public extension Bus {
final class Sync<Src, Dst> {
let v = PassthroughSubject<Src, Never>()
var subscriptions = [AnyCancellable]()
public init(
_ handler: @escaping ((Src) -> Dst?),
_ src: String,
_ dst: String
) {
// Вход.
Bus.receiveSync(
[src],
{ [weak self] _, v in self?.v.send(v) },
&subscriptions
)
// Выход.
Bus.sendSync(
dst,
v
.compactMap { (v: Src) in handler(v) }
.eraseToAnyPublisher(),
&subscriptions
)
}
}
}

View File

@@ -10,7 +10,7 @@ public enum Bus {
}
public extension Bus {
/// Асинхронно обрабатывать входящие события из шины.
/// Асинхронно обрабатываем входящие события из шины.
static func receiveAsync<T>(
_ keys: Set<String>,
_ handler: @escaping ((String, T) -> Void),
@@ -23,7 +23,7 @@ public extension Bus {
.store(in: &subscriptions)
}
/// Синхронно обрабатывать входящие события из шины.
/// Синхронно обрабатываем входящие события из шины.
static func receiveSync<T>(
_ keys: Set<String>,
_ handler: @escaping ((String, T) -> Void),
@@ -35,19 +35,10 @@ public extension Bus {
.store(in: &subscriptions)
}
/// Асинхронно отправлять события из узла в шину.
static func sendAsync<T>(
_ key: String,
_ node: AnyPublisher<T, Never>,
_ subscriptions: inout [AnyCancellable]
) {
node
.receive(on: DispatchQueue.main)
.sink { v in e.send((key, v)) }
.store(in: &subscriptions)
}
/// Синхронно отправлять события из узла в шину.
/// Синхронно отправляем события из узла в шину.
///
/// Для асинхронной отправки достаточно добавить оператор `receive(on:)`
/// в цепочке параметра `node`
static func sendSync<T>(
_ key: String,
_ node: AnyPublisher<T, Never>,
@@ -58,35 +49,8 @@ public extension Bus {
.store(in: &subscriptions)
}
/// Синхронно отправить событие в шину один раз.
/// Единоразово синхронно отправляем событие в шину.
static func send(_ key: String, _ value: Any) {
e.send((key, value))
}
}
public extension Bus {
static func processAsync<Src, Dst>(
_ keysIn: Set<String>,
_ keyOut: String,
_ handler: @escaping ((Src) -> Dst?),
_ subscriptions: inout [AnyCancellable]
) {
e
.compactMap { processKeysValue($0, keysIn, handler) }
.receive(on: DispatchQueue.main)
.sink { vOut in Self.e.send((keyOut, vOut)) }
.store(in: &subscriptions)
}
static func processSync<Src, Dst>(
_ keysIn: Set<String>,
_ keyOut: String,
_ handler: @escaping ((Src) -> Dst?),
_ subscriptions: inout [AnyCancellable]
) {
e
.compactMap { processKeysValue($0, keysIn, handler) }
.sink { vOut in Self.e.send((keyOut, vOut)) }
.store(in: &subscriptions)
}
}