Hi! I'm working on a Bitcoin RPC+P2P protocol client based on SwiftNIO + ServiceLifecycle async APIs.
The RPC server is always on but the P2P server can be made to start/stop listening via RPC command.
To keep the P2P service running even when it's not listening I've had to resource to a bit of a hack using an empty AsyncChannel that is never ever sent any actual message. The reason for this is to be able to leverage cancelOnGracefulShutdown(), here it is:
public func run() async throws {
status.isRunning = true // Service is always running as long as it hasn't fully stopped
await withGracefulShutdownHandler {
// We want to keep the service alive while we connect/disconnect from servers. Unless there is a shutdown signal.
for await _ in AsyncChannel<()>().cancelOnGracefulShutdown() { }
} onGracefulShutdown: {
// P2P server shutting down gracefully…
}
}
I'm sure there's a better way to keep the service hanging around before it gets replaced with an actual protocol handler. Or maybe the actual NIO channel handler can be somehow suspended?
The biggest impediments seem to be:
Having the stand-by service respond to graceful shutdown requests.
Keep everything in the same Task tree so that cancellation propagates properly.
I suspect a solution might come from having some sort of TaskGroup similar to how SwiftNIO handles incoming connections but I wasn't able to put that idea into code.
Anyway I appreciate any help/suggestions/comments regarding this specific issue or the overall design/implementation of my solution. Thanks!
The reason why you need the "empty" AsyncChannel is because your P2PService isn't using structured concurrency but rather creates an unstructured Task {} to run the actual underlying server. What you should be doing instead is inside the run method you should bootstrap your server and start handling the incoming connections. This way you can also handle the graceful shutdown inside that run method.
Thank you! That's interesting…. It's true I'm creating an unstructured Task for the actual bootstrapping which breaks the entire model. It would be easier to simply await the server channel inside run() but how can I have the service stand by (as in not accepting connections) until the user turns it on/off via RPC command?
public func run() async throws {
…
await withGracefulShutdownHandler {
try await withThrowingDiscardingTaskGroup { group in
try await serverChannel.executeThenClose { serverChannelInbound in
// We are now listening for inbound connections which we DO NOT want as the RPC has not yet received the "start-listening-p2p" command.
The root of the issue appears to be that ServiceLifecycle.ServiceGroup needs all services beforehand before calling run(). I would like to add additional services later on as the main always-on RPC server receives commands to start/stop listening to P2P.
Unless there is a way to do something at this level…
I'll keep looking into Hummingbird's sources for clues and inspiration. I like how it uses a state machine for server status transitions but can't figure out whether it is possible to go from stopped to running again.
That's not true. ServiceLifecycle doesn't enforce anything here and a common pattern is for run() to start servers and start handling traffic. This means for your concrete service that it has to deal with the fact that it might get calls to other methods before the server is actually running. It can either queue them or reject them. Generally queuing is preferred.
Now to your concrete problem. I would recommend adding all services to the group in the beginning and injecting the p2pService into the rpcService so that the rpcService can inform the p2pService once a request came in and it should start accepting connections. In the run method of your p2pService you can then await on an AsyncChannel<Void> until it receives a value then start accepting the connections. Roughly speaking I am thinking of something like this
actor P2PService: Service {
let channel = AsyncChannel<Void>()
func run() async throws {
_ = await channel.first { _ in true }
// ServerBootstrap here
}
func triggerStart() {
channel.send(())
}
The main way I think about this is that there always needs to be a single task that owns a resource like a server socket. The run() method of Service is enforcing that pattern and you use async sequences to invert control and let those different tasks communicate with each other.
Thanks to your proposed solution I was able to remove all unstructured tasks and fully adopt structured concurrency. All while being able to start/stop listening for incoming P2P connections on demand.
Same goes for outgoing client P2P connections that can now connect/disconnect on request by the RPC service.
Seems to work perfectly in practice. Here's how the final code looks:
private var listenRequests = AsyncChannel<()>() // This guy only receives "pings"
public func run() async throws {
try await withGracefulShutdownHandler {
for await _ in listenRequests.cancelOnGracefulShutdown() {
try await startListening()
}
} onGracefulShutdown: { … }
}
public func start(on port: Int) async {
status.port = port
await listenRequests.send(()) // Signal to start listening
}
func startListening() async throws {
// Actual bootstrapping/listening here:
let serverChannel = try await bootstrap.bind(…) {…}
…
await withGracefulShutdownHandler {
try await withThrowingDiscardingTaskGroup { group in
try await serverChannel.executeThenClose { serverChannelInbound in … }
}
}
I'm sure it can be improved but it now feels less hacky and more importantly everything is on the same task tree which eliminates future headaches.