i’m not convinced a channel handler is needed here. right now i’m using this noncopyable abstraction to start a monitoring task that runs alongside the main task that reads the NIOAsyncChannel
:
import Atomics
struct TimeCop:~Copyable, Sendable
{
let epoch:UnsafeAtomic<UInt>
init()
{
self.epoch = .create(0)
}
deinit
{
self.epoch.destroy()
}
}
extension TimeCop
{
func reset()
{
self.epoch.wrappingIncrement(ordering: .relaxed)
}
func start(interval:Duration = .milliseconds(1000)) async throws
{
var epoch:UInt = 0
while true
{
try await Task.sleep(for: interval)
switch self.epoch.load(ordering: .relaxed)
{
case epoch:
// The epoch hasn’t changed, so we should enforce the timeout.
return
case let new:
// The epoch has changed, so let’s wait another `interval`.
epoch = new
}
}
}
}
it checks the value of the atomic counter every second, but since the connection would usually be accompanied by at least one request fragment, it gives the client about two seconds to trigger some activity on the channel.
/// Reap connections after one second of inactivity.
let cop:TimeCop = .init()
async
let _:Void =
{
(cop:borrowing TimeCop) in
try await cop.start(interval: .milliseconds(1000))
try await connection.channel.close()
} (cop)
every time something is read from the inbound stream, i am resetting the timeout by incrementing the counter:
var inbound:NIOAsyncChannelInboundStream<HTTP2Frame.FramePayload>.AsyncIterator =
stream.inbound.makeAsyncIterator()
var headers:HPACKHeaders? = nil
while let payload:HTTP2Frame.FramePayload = try await inbound.next()
{
cop.reset()
...
then when we are ready to send the response, i am incrementing the counter once more:
cop.reset()
try await stream.outbound.finish(with: message)