Uncategorized

Learning how to think in pipelines with Combine

In this post, we’re going to follow an iterative, real-world example of how to solve a complex problem with Combine. You’ll also learn to how to overcome the pitfalls of procedural thinking when designing Combine pipelines. Let’s get started.

How would you solve this problem?

You’re implementing a complex data loading system.

You have data sources A, B, and C to read from
Each needs to be connected/initialized before reading any data from it
To initialize B and C, you must read a configuration object from A
All the data sources are synced from a cloud service automatically when initialized, which could take a variable amount of time for each
An auth token is required to open the data sources, which must be fetched from a web service

With each of these requirements, the complexity grows. In a real project, these requirements may have been added over months and multiple shipping versions of the app. Without the full context from the start, accounting for the final complexity becomes very difficult.

An experienced reader may have already recognized these as asynchronous problems. Knowing that the complexity compounds further. We have to manage callbacks and dispatch queues to avoid blocking the main thread, tricky but nothing too painful. You may even reach for operation queues which would also help with the dependency management for this data.

You can download the full Swift Playground and follow along. There are multiple pages, each corresponding to one of the steps below, and a Common.swift file that contains some of the convenience functions and type definitions used in these examples.

Simplicity is Key, Right?

In a naive, single-threaded case (or our glorious async/await future, but that’s another blog post), your code may look something like this:

// From page “01 – Sequential BG Queue”
func getAllDataSources(userName: String) -> MyDataSourceFacade {
let token = getTokenFromServer()

let A = getDataSourceA(token)

let userData = A.getData(for: userName)

let B = getDataSourceB(userData, token)
let C = getDataSourceC(userData, token)

return MyDataSourceFacade(userData, A, B, C)
}

You may notice one big thing that’s missing from this example: error handling. So it would be a bit more complex in reality but roughly the same structure.

To get this off the main thread, we’d need something like the following:

// From page “01 – Sequential BG Queue”
DispatchQueue.global(qos: .userInitiated).async {
let facade = getAllDataSources(userName: “Jim”)

DispatchQueue.main.async {
print(“done!”)
// do something with facade
}
}

It’s a familiar pattern, but it’s very brittle and is prone to simple errors when adding functionality. It’s also very static. What if someone refactors the code and forgets to dispatch the code off and on the main thread properly? What if the auth token expires and we need to start the process over?

A First Try with Combine

Thankfully these things are much easier in a pipeline-oriented paradigm like Combine. A very natural way to update this for Combine is to replace the variables with Subjects or @Published properties then fuse them all together like this:

class FacadeProvider {

@Published private var token: String
@Published private var A: MyDataSource
@Published private var B: MyDataSource
@Published private var C: MyDataSource
@Published private var userData: MyUserData

private var cancellables: [AnyCancellable] = []

func getAllDataSources(userName: String) -> AnyPublisher<MyDataSourceFacade, Never> {

cancellables = []

getTokenPublisher()
.logError()
.subscribe(on: backgroundQueue)
.assign(to: .token, on: self)
.store(in: &cancellables)

$token
.tryMap { getDataSourceA($0) }
.logError()
.subscribe(on: backgroundQueue)
.assign(to: .A, on: self)
.store(in: &cancellables)

$A
.tryMap { $0.getData(for: userName) }
.logError()
.subscribe(on: backgroundQueue)
.assign(to: .userData, on: self)
.store(in: &cancellables)

let userAndTokenPub = $userData.combineLatest($token)

userAndTokenPub
.tryMap { getDataSourceB($0.0, $0.1) }
.logError()
.subscribe(on: backgroundQueue)
.assign(to: .B, on: self)
.store(in: &cancellables)

userAndTokenPub
.tryMap { getDataSourceC($0.0, $0.1) }
.logError()
.subscribe(on: backgroundQueue)
.assign(to: .C, on: self)
.store(in: &cancellables)

return $userData.combineLatest($A, $B, $C)
.map { (userData, A, B, C) -> MyDataSourceFacade? in
return MyDataSourceFacade(userData, A, B, C)
}
.subscribe(on: backgroundQueue)
.receive(on: DispatchQueue.main)
.eraseToAnyPublisher()
}
}

This is a pretty direct translation from our naive example, and it’s easy to figure out what’s happening. I purposely chose this because it’s what those new to Combine will likely think to do when hearing about @Published, myself included. It’s a bit more verbose, but we constructed valid pipelines, logged errors (albeit with a helper function) and guaranteed the threading behavior we wanted.

Better? Or Worse…

However, I’ve glossed over a pretty big problem with this implementation: it doesn’t actually work. We’ve defined our properties as non-optional, so when we create this type, each property must contain a value. However, we don’t have initial values for these complex data types.

So let’s change this to actually work, using optional properties where needed:

// From page “02 – Combine First Try”
class FacadeProvider {

@Published private var token: String?
@Published private var A: MyDataSource?
@Published private var B: MyDataSource?
@Published private var C: MyDataSource?
@Published private var userData: MyUserData?

private var cancellables: [AnyCancellable] = []

func getAllDataSources(userName: String) -> AnyPublisher<MyDataSourceFacade, Never> {

cancellables = []

getTokenPublisher()
.logError()
.subscribe(on: backgroundQueue)
.assign(to: .token, on: self)
.store(in: &cancellables)

$token
.ignoreNil()
.tryMap { getDataSourceA($0) }
.logError()
.subscribe(on: backgroundQueue)
.assign(to: .A, on: self)
.store(in: &cancellables)

$A
.ignoreNil()
.tryMap { $0.getData(for: userName) }
.logError()
.subscribe(on: backgroundQueue)
.assign(to: .userData, on: self)
.store(in: &cancellables)

let userAndTokenPub = $userData.ignoreNil().combineLatest($token.ignoreNil())

userAndTokenPub
.tryMap { getDataSourceB($0.0, $0.1) }
.logError()
.subscribe(on: backgroundQueue)
.assign(to: .B, on: self)
.store(in: &cancellables)

userAndTokenPub
.tryMap { getDataSourceC($0.0, $0.1) }
.logError()
.subscribe(on: backgroundQueue)
.assign(to: .C, on: self)
.store(in: &cancellables)

return $userData.combineLatest($A, $B, $C)
.compactMap { (userData, A, B, C) -> MyDataSourceFacade? in
guard let userData = userData,
let A = A,
let B = B,
let C = C else {
return nil
}

return MyDataSourceFacade(userData, A, B, C)
}
.subscribe(on: backgroundQueue)
.receive(on: DispatchQueue.main)
.eraseToAnyPublisher()
}
}

This is starting to get messy. Not to mention that our error handling could use some improvement. In this implementation, the caller of this function will never receive an Error, because the Publisher they’re returned is only connected to the @Published properties (whose Failure types are Never). This is a problem because if any setup goes awry and the process needs to start over, the caller will just wait quietly for a value/error that will never come. That’s obviously not ideal.

Wield the Pipeline(s)

The problem here is with how we’ve decided to model the problem with Combine. We did something that seemed natural to a developer who has worked almost exclusively with procedural code, which I’d bet is most of us in the iOS/Mac developer community. But that’s not what Combine is made for. We need to model this as a reactive stream: multiple signals that come together to give a complex output value.

Here’s a more “Combine-flavored” solution:

// From page “03 – Combine Flavored”
func getAllDataSources(userName: String) -> AnyPublisher<MyDataSourceFacade, Error> {

let tokenPub = getTokenPublisher()

let APub = tokenPub
.tryMap { getDataSourceA($0) }

let userDataPub = APub
.tryMap { $0.getData(for: userName) }

let userAndTokenPub = userDataPub.combineLatest(tokenPub)

let BPub = userAndTokenPub
.tryMap { getDataSourceB($0.0, $0.1) }

let CPub = userAndTokenPub
.tryMap { getDataSourceC($0.0, $0.1) }

return userDataPub.combineLatest(APub, BPub, CPub)
.compactMap { (userData, A, B, C) -> MyDataSourceFacade? in
print(“Returning facade”)
return MyDataSourceFacade(userData, A, B, C)
}
.subscribe(on: backgroundQueue)
.receive(on: DispatchQueue.main)
.eraseToAnyPublisher()
}

This is so much better! No more managing subscriptions with AnyCancellable! No more assigning to properties! We’re returning errors properly to the caller! But there is one wrinkle in this code that can trip you up. When we run this, we notice in our server logs that we’re contacting the auth server six times for the token every time we create a facade. Huh, that’s weird… Let’s take a look at why this is happening.

Above is a diagram of what we expected our above code to do, and what actually happened. On the left, we see the simple data flow we intended to define in the previous code sample, where each value is only created once. On the right, we see the actual outcome, where every intermediate value is being duplicated for every receiver. This is because the publishers we defined are only “recipes” for creating Subscriptions.

Subscriptions are the actual data-flow connections that are created when a subscriber connects to the pipeline. The subscription process happens in reverse, against the flow of data. By default, publishers don’t know about their existing subscriptions, they must create a new subscription to their upstream source each time they receive a downstream connection. That’s what you want in most cases, where stateless, value-type semantics offer safety and convenience, but in our case we only need these intermediate publishers to load their data a single time.

Spread the Word with Reference-type Publishers

Luckily Combine has a solution for this: class-type publishers like Share, Multicast, and Autoconnect. Share is the simplest to use since (per Apple’s documentation) it’s “effectively a combination of Multicast and PassthroughSubject, with an implicit .autoconnect().” We’ll update our re-used publishers to use .share() so they can publish to multiple downstreams.

// From page “04 – Shared Publishers
func getAllDataSources(userName: String) -> AnyPublisher<MyDataSourceFacade, Error> {

let tokenPub = getTokenPublisher()
.share()

let APub = tokenPub
.tryMap { getDataSourceA($0) }
.share()

let userDataPub = APub
.tryMap { $0.getData(for: userName) }
.share()

let userAndTokenPub = userDataPub.combineLatest(tokenPub)
.share()

let BPub = userAndTokenPub
.tryMap { getDataSourceB($0.0, $0.1) }

let CPub = userAndTokenPub
.tryMap { getDataSourceC($0.0, $0.1) }

return userDataPub.combineLatest(APub, BPub, CPub)
.compactMap { (userData, A, B, C) -> MyDataSourceFacade? in
print(“Returning facade on (Thread.current.description)”)
return MyDataSourceFacade(userData, A, B, C)
}
.subscribe(on: backgroundQueue)
.receive(on: DispatchQueue.main)
.eraseToAnyPublisher()
}

 

And that’s it! For real this time. Sorry for the deception, but I wanted to present this in a realistic, iterative-problem-solving way, so you could directly see what sort of issues you may run into when using Combine in the real world.

In fact, this blog post is almost exactly the path I took in a recent project (minus a lot of frustration and soul-searching along the way). But once I had a breakthrough on how to use Combine “The Right Way,” I was honestly giddy. And I never use that word (it sounds gross to me). So I felt the need to share and hopefully help anyone else out there struggling to take their first steps into the reactive world.

The post Learning how to think in pipelines with Combine appeared first on Digital Product Development Agency | Big Nerd Ranch.

Posted by , 0 comments

Learning the way to consider pipelines together Combine

In this specific post, we’re going to stick to with an pragmatic, real instance the way to exactly to address a intricate dilemma with Combine. You’ll even know how to the way to over come the drawbacks of procedural believing when developing Combine pipelines. Let’s get going.

How do you resolve this dilemma ?

You’re executing a intricate info loading approach.

You have info resources B, A, and C to see in
Each must be more connected/initialized prior to studying any information as a Result
To initialize C and B, you also should see a setup thing out of the
All the information resources have been synced via an cloud hosting support mechanically when initialized, Which Might Have a varying Quantity of time for every single
An auth token is needed to start up the information resources, that should be deducted from an Internet Support

With every one of these conditions, the sophistication develops. In a true job, all these conditions could have now been inserted over multiple and months sending variants of this program. Without that the complete circumstance from your beginning, accounting to the last sophistication gets very tricky.

An veteran writer could have previously recognized such as asynchronous troubles. Knowing the sophistication substances farther. We need to handle call backs and dispatch queues in order to prevent blocking the major ribbon, catchy but nothing overly debilitating. You can likewise reach operation queues that will likewise help together with the addiction control with the data.

You can download the full Swift Playground and follow along. There are numerous webpages, each comparable to a number of those steps underneath, along with a Common.swift document which has a number of these advantage purposes and form definitions utilised in those cases.

Simplicity is Key, Right?

In an innocent, single-threaded instance (or our magnificent async/await potential, however, that’s just another blog article ), the code might look something Such as This:

// From webpage “01 – Sequential BG Queue”
func getAllInfo Sources(User-Name: String) -> MyDataSourceFacade {
let token = buy TokenFromServer()

let A = getDataSourceA(token)

let userData = A.getData(such as: User-Name )

let B = getDataSourceB(userData, token)
let C = getDataSourceC(userData, token)

return MyDataSourceFacade(userData, B, A, C)
}

You will find one large item that’s missing using that case: mistake tackling. So it’d have been somewhat more technical the truth is but about the exact very same arrangement.

To eliminate off this off the Major thread, we’d Require something such as the Subsequent:

// From webpage “01 – Sequential BG Queue”
DispatchQueue.global(qos: .userInitiated).async {
let facade = buy AllInfo Sources(User-Name: “Jim”)

DispatchQueue.main.async {
print(“done!”)
// do some thing together with facade
}
}

It’s a pattern that is comfortable, however, it’s very delicate and can be more prone to straightforward mistakes when incorporating operation. It’s also incredibly inactive. What if some one refactors the code and also forgets to dispatch off the code and around the home ribbon correctly? What in the event the auth token expires and we have to initiate the procedure?

A First Try with Combine

Thankfully those matters are easier at an pipeline-oriented paradigm such as Combine. A Exact natural Means to upgrade this for Combine would be always to substitute these factors with Subjects or @Published Qualities subsequently redesign them together such as that:

class FacadeProvider {

@Published personal var token: String
@Published personal var A: MyDataSource
@Published personal var B: MyDataSource
@Published personal var Do: MyDataSource
@Published personal var userData: MyUserData

private var cancellables: [AnyCancellable] = []

func getAllInfo Sources(User-Name: String) -> AnyPublisher<MyDataSourceFacade, Never> {

cancellables = []

getTokenPublisher()
.logError()
.subscribe(on: desktop Queue)
.assign(to: .token, onitself )
.store(in: &cancellables)

$token
.tryMap { getDataSourceA($0) }
.logError()
.subscribe(on: desktop Queue)
.assign(to: .On : self)
.store(in: &cancellables)

$A
.tryMap { $0.getData(for: userName) }
.logError()
.subscribe(on: backgroundQueue)
.assign(to: .userData, onitself )
.store(in: &cancellables)

let userAndTokenPub = $userData.combineLatest($token)

userAndTokenPub
.tryMap { getDataSourceB($0.0, $0.1) }
.logError()
.subscribe(on: backgroundQueue)
.assign(to: .B, on: self)
.store(in: &cancellables)

userAndTokenPub
.tryMap { getDataSourceC($0.0, $0.1) }
.logError()
.subscribe(on: backgroundQueue)
.assign(to: .C, on: self)
.store(in: &cancellables)

return $userData.combineLatest($A, $B, $C)
.map { (userData, A, B, C) -> MyDataSourceFacade? in
return MyDataSourceFacade(userData, A, B, C)
}
.subscribe(on: backgroundQueue)
.receive(on: DispatchQueue.main)
.eraseToAnyPublisher()
}
}

This is a pretty direct translation from our naive example, and it’s easy to figure out what’s happening. I purposely chose this because it’s what those new to Combine will likely think to do when hearing about @Published, myself included. It’s a bit more verbose, but we constructed valid pipelines, logged errors (albeit with a helper function) and guaranteed the threading behavior we wanted.

Better? Or Worse…

However, I’ve glossed over a pretty big problem with this implementation: it doesn’t actually work. We’ve defined our properties as non-optional, so when we create this type, each property must contain a value. However, we don’t have initial values such as these complex data types.

So let’s change this to actually work, using optional properties where needed:

// From page “02 – Combine First Try”
class FacadeProvider {

@Published private var token: String?
@Published private var A: MyDataSource?
@Published private var B: MyDataSource?
@Published private var C: MyDataSource?
@Published private var userData: MyUserData?

private var cancellables: [AnyCancellable] = []

func getAllDataSources(User-Name : String) -> AnyPublisher<MyDataSourceFacade, Never> {

cancellables = []

getTokenPublisher()
.logError()
.subscribe(on: desktop Queue)
.assign(to: .token, onitself )
.store(in: &cancellables)

$token
.ignoreNil()
.tryMap { getDataSourceA($0) }
.logError()
.subscribe(on: backgroundQueue)
.assign(to: .A, on: self)
.store(in: &cancellables)

$A
.ignoreNil()
.tryMap { $0.getData(for: userName) }
.logError()
.subscribe(on: backgroundQueue)
.assign(to: .userData, on: self)
.store(in: &cancellables)

let consumer AndTokenPub = $userData.ignoreNil().combineLatest($token.ignoreNil())

userAndTokenPub
.tryMap { getDataSourceB($0.0, $0.1) }
.logError()
.subscribe(on: desktop Queue)
.assign(to: .B, onitself )
.store(in: &cancellables)

userAndTokenPub
.tryMap { getDataSourceC($0.0, $0.1) }
.logError()
.subscribe(on: desktop Queue)
.assign(to: .C, onitself )
.store(in: &cancellables)

return $userData.combineLatest($A, $B, $do )
.compactMap { (userData, A, C, B ) -> MyDataSourceFacade? in
guard let userData = userData,
let A = A,
let B = B,
let C = C else {
return nil
}

return MyDataSourceFacade(userData, B, A, C)
}
.subscribe(on: desktop Queue)
.receive(on: DispatchQueue.main)
.eraseToAnyPublisher()
}
}

This can be starting to get messy. Not to mention that our error handling could use some improvement. In this implementation, the caller of this function will never receive an Error, because the Publisher they’re returned is only connected to the @Published properties (whose Failure types are Never). This is really a problem because if any setup goes awry plus the process needs to start over, the caller will just wait quietly for a value/error that will never come. That’s obviously not ideal.

Wield the Pipeline(s)

The problem here is with how we’ve decided to model the problem with Combine. We did something that seemed natural to a developer who has worked almost exclusively with procedural code, which I’d bet is most of us in the iOS/Mac developer community. But that’s not what Combine is made for. We need to model this as a reactive stream: multiple signals that come together to give a complex output value.

Here’s a more “Combine-flavored” solution:

// From page “03 – Combine Flavored”
func getAllDataSources(userName: String) -> AnyPublisher<MyDataSourceFacade, Error> {

let tokenPub = getTokenPublisher()

let APub = tokenPub
.tryMap { getDataSourceA($0) }

let userDataPub = APub
.tryMap { $0.getData(for: userName) }

let userAndTokenPub = userDataPub.combineLatest(tokenPub)

let BPub = userAndTokenPub
.tryMap { getDataSourceB($0.0, $0.1) }

let CPub = userAndTokenPub
.tryMap { getDataSourceC($0.0, $0.1) }

return userDataPub.combineLatest(APub, BPub, CPub)
.compactMap { (userData, A, B, C) -> MyDataSourceFacade? in
print(“Returning facade”)
return MyDataSourceFacade(userData, A, B, C)
}
.subscribe(on: backgroundQueue)
.receive(on: DispatchQueue.main)
.eraseToAnyPublisher()
}

This is so much better! No more managing subscriptions with AnyCancellable! No more assigning to properties! We’re returning errors properly to the caller! But there is one wrinkle in this code that can trip you up. When we run this, we notice in our server logs that we’re contacting the auth server six times for the token every time we create a facade. Huh, that’s weird… Let’s take a look at why this is taking place.

Above is a diagram of what we expected our above code to do, and what actually happened. On the left, we see the simple data flow we intended to define in the previous code sample, where each value is only created once. On the right, we see the actual outcome, where every intermediate value is being duplicated for every receiver. This is because the publishers we defined are only “recipes” for creating Subscriptions.

Subscriptions are the actual data-flow connections that are created when a subscriber connects to the pipeline. The subscription process happens in reverse, against the flow of data. By default, publishers don’t know about their existing subscriptions, they must create a new subscription to their upstream source each time they receive a downstream connection. That’s what you want in most cases, where stateless, value-type semantics offer safety and convenience, but in our case we only need these intermediate publishers to load their data a single time.

Spread the Word with Reference-type Publishers

Luckily Combine has a solution for this: class-type publishers like Share, Multicast, and Autoconnect. Share is the simplest to use since (per Apple’s documentation) it’s “effectively a combination of Multicast and PassthroughSubject, with an implicit .autoconnect().” We’ll update our re-used publishers to use .share() so they can publish to multiple downstreams.

// From page “04 – Shared Publishers
func getAllDataSources(userName: String) -> AnyPublisher<MyDataSourceFacade, Error> {

let tokenPub = getTokenPublisher()
.share()

let APub = tokenPub
.tryMap { getDataSourceA($0) }
.share()

let userDataPub = APub
.tryMap { $0.getData(for: userName) }
.share()

let userAndTokenPub = userDataPub.combineLatest(tokenPub)
.share()

let BPub = userAndTokenPub
.tryMap { getDataSourceB($0.0, $0.1) }

let CPub = userAndTokenPub
.tryMap { getDataSourceC($0.0, $0.1) }

return userDataPub.combineLatest(APub, BPub, CPub)
.compactMap { (userData, A, B, C) -> MyDataSourceFacade? in
print(“Returning facade on (Thread.current.description)”)
return MyDataSourceFacade(userData, A, B, C)
}
.subscribe(on: backgroundQueue)
.receive(on: DispatchQueue.main)
.eraseToAnyPublisher()
}

And that’s it! For real time. Sorry for the deception, but I wanted to present this in a realistic, iterative-problem-solving way, so you could directly see just what sort of issues you may run into if using Combine in the real world.

In fact, this blog post is almost exactly the path I took in an recent project (minus a lot of frustration and soul-searching along the way). But once I had a breakthrough on how to use Combine “The Right Way,” I was honestly giddy. And I never use word (it sounds gross into me). So I felt the need to share and also hopefully help anyone else out there struggling to take their first steps into that the Re-Active life.

Posted by , 0 comments