Михаил Капелько 10 months ago
parent
commit
0f76c99c95
1 changed files with 20 additions and 38 deletions
  1. +20
    -38
      Modules/BusX/Bus/src/Bus.swift

+ 20
- 38
Modules/BusX/Bus/src/Bus.swift View File

@@ -2,32 +2,17 @@ import Combine
import Foundation

public enum Bus {
static let e = PassthroughSubject<(key: String, value: Any), Never>()
/**/static var subscriptions = [AnyCancellable]()
private static let e = PassthroughSubject<(key: String, value: Any), Never>()

public static var events: AnyPublisher<(key: String, value: Any), Never> {
e.eraseToAnyPublisher()
}
}

public extension Bus {
enum Option {
case async
}

static var events: AnyPublisher<(key: String, value: Any), Never> {
Self.e.eraseToAnyPublisher()
}
}

private extension Bus {
static func subscribe(
_ subscription: AnyCancellable?,
_ sub: UnsafeMutablePointer<[AnyCancellable]>?
) {
guard let subscription else { return }
if let sub = sub {
sub.pointee.append(subscription)
} else {
Self.subscriptions.append(subscription)
}
}
}

public extension Bus {
@@ -35,52 +20,50 @@ public extension Bus {
_ keys: Set<String>,
_ handler: @escaping ((String, T) -> Void),
opt: [Option] = [],
sub: UnsafeMutablePointer<[AnyCancellable]>? = nil
sub subscriptions: inout [AnyCancellable]
) {
var subscription: AnyCancellable?
let isAsync = opt.contains(.async)

// Async.
if isAsync {
subscription = Self.events
Self.events
.compactMap { convertKeyValue(keys, $0) }
.receive(on: DispatchQueue.main)
.sink { v in handler(v.0, v.1) }
.store(in: &subscriptions)
}

// Async.
if !isAsync {
subscription = Self.events
Self.events
.compactMap { convertKeyValue(keys, $0) }
.sink { v in handler(v.0, v.1) }
.store(in: &subscriptions)
}

subscribe(subscription, sub)
}

static func send<T>(
_ key: String,
_ node: AnyPublisher<T, Never>,
opt: [Option] = [],
sub: UnsafeMutablePointer<[AnyCancellable]>? = nil
sub subscriptions: inout [AnyCancellable]
) {
var subscription: AnyCancellable?
let isAsync = opt.contains(.async)

// Async.
if isAsync {
subscription = node
node
.receive(on: DispatchQueue.main)
.sink { v in Self.e.send((key, v)) }
.store(in: &subscriptions)
}

// Sync.
if !isAsync {
subscription = node
node
.sink { v in Self.e.send((key, v)) }
.store(in: &subscriptions)
}

subscribe(subscription, sub)
}
static func send(_ key: String, _ value: Any) {
@@ -94,26 +77,25 @@ public extension Bus {
_ keyOut: String,
_ handler: @escaping ((Src) -> Dst?),
opt: [Option] = [],
sub: UnsafeMutablePointer<[AnyCancellable]>? = nil
sub subscriptions: inout [AnyCancellable]
) {
var subscription: AnyCancellable?
let isAsync = opt.contains(.async)

// Async.
if isAsync {
subscription = Self.events
Self.events
.compactMap { processKeysValue($0, keysIn, handler) }
.receive(on: DispatchQueue.main)
.sink { vOut in Self.e.send((keyOut, vOut)) }
.store(in: &subscriptions)
}

// Sync.
if !isAsync {
subscription = Self.events
Self.events
.compactMap { processKeysValue($0, keysIn, handler) }
.sink { vOut in Self.e.send((keyOut, vOut)) }
.store(in: &subscriptions)
}

subscribe(subscription, sub)
}
}

Loading…
Cancel
Save