Yes, after GlueHandler is successfully added, it calls the context.pipeline.removeHandler(self, promise: nil) as in the Connect proxy example.
By "partner pipeline" you mean the server bootstrap?
Below if my current code, in case you want to check out the sequence of calls:
final class HTTPConnectHandler: ChannelInboundHandler, RemovableChannelHandler {
func removeHandler(context: ChannelHandlerContext, removalToken: ChannelHandlerContext.RemovalToken) {
if case let .pendingConnection(head) = self.state {
self.state = .connected
context.fireChannelRead(self.wrapInboundOut(.head(head)))
if let bufferedBody = self.bufferedBody {
context.fireChannelRead(self.wrapInboundOut(.body(.byteBuffer(bufferedBody))))
self.bufferedBody = nil
}
if let bufferedEnd = self.bufferedEnd {
context.fireChannelRead(self.wrapInboundOut(.end(bufferedEnd)))
self.bufferedEnd = nil
}
context.fireChannelReadComplete()
}
context.leavePipeline(removalToken: removalToken)
}
enum State {
case idle
case pendingConnection(head: HTTPRequestHead)
case connected
}
enum ConnectError: Error {
case invalidURL
case wrongScheme
case wrongHost
}
typealias InboundIn = HTTPServerRequestPart
typealias InboundOut = HTTPClientRequestPart
private var state = State.idle
private var bufferedBody: ByteBuffer?
private var bufferedEnd: HTTPHeaders?
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
guard case .head(var head) = self.unwrapInboundIn(data) else {
let unwrapped = self.unwrapInboundIn(data)
switch unwrapped {
case .body(let buffer):
switch state {
case .connected:
context.fireChannelRead(self.wrapInboundOut(.body(.byteBuffer(buffer))))
case .pendingConnection(_):
print("Buffering body")
self.bufferedBody = buffer
default:
// shouldnt happen
break
}
case .end(let headers):
switch state {
case .connected:
context.fireChannelRead(self.wrapInboundOut(.end(headers)))
case .pendingConnection(_):
print("Buffering end")
self.bufferedEnd = headers
default:
// shouldnt happen
break
}
case .head(_):
assertionFailure("Not possible")
break
}
return
}
os_log(.default, log: .default, "Connecting to URI: %{public}s", head.uri as NSString)
guard let parsedUrl = URL(string: head.uri) else {
context.fireErrorCaught(ConnectError.invalidURL)
return
}
os_log(.default, log: .default, "Parsed scheme: %{public}s", (parsedUrl.scheme ?? "no scheme") as NSString)
guard parsedUrl.scheme == "http" else {
context.fireErrorCaught(ConnectError.wrongScheme)
return
}
guard let host = head.headers.first(name: "Host"), host == parsedUrl.host else {
os_log(.default, log: .default, "Wrong host")
context.fireErrorCaught(ConnectError.wrongHost)
return
}
var targetUrl = parsedUrl.path
if let query = parsedUrl.query {
targetUrl += "?\(query)"
}
head.uri = targetUrl
switch state {
case .idle:
state = .pendingConnection(head: head)
connectTo(host: host, port: 80, context: context)
case .pendingConnection(_):
os_log(.default, log: .default, "Logic error fireChannelRead with incorrect state")
case .connected:
context.fireChannelRead(self.wrapInboundOut(.head(head)))
}
}
private func connectTo(host: String, port: Int, context: ChannelHandlerContext) {
let channelFuture = ClientBootstrap(group: context.eventLoop)
.channelInitializer { channel in
channel.pipeline.addHandler(HTTPRequestEncoder()).flatMap {
channel.pipeline.addHandler(ByteToMessageHandler(HTTPResponseDecoder(leftOverBytesStrategy: .forwardBytes)))
}
}
.connect(host: host, port: port)
channelFuture.whenSuccess { channel in
self.connectSucceeded(channel: channel, context: context)
}
channelFuture.whenFailure { error in
self.connectFailed(error: error, context: context)
}
}
private func connectSucceeded(channel: Channel, context: ChannelHandlerContext) {
os_log(.default, log: .default, "Connect succeeded")
self.glue(channel, context: context)
}
private func connectFailed(error: Error, context: ChannelHandlerContext) {
os_log(.error, log: .default, "Connect failed: %@", error as NSError)
context.fireErrorCaught(error)
}
private func glue(_ peerChannel: Channel, context: ChannelHandlerContext) {
self.removeEncoder(context: context)
// Now we need to glue our channel and the peer channel together.
let (localGlue, peerGlue) = GlueHandler.matchedPair()
context.channel.pipeline.addHandler(localGlue).and(peerChannel.pipeline.addHandler(peerGlue)).whenComplete { result in
switch result {
case .success(_):
context.pipeline.removeHandler(self, promise: nil)
case .failure(_):
// Close connected peer channel before closing our channel.
peerChannel.close(mode: .all, promise: nil)
context.close(promise: nil)
}
}
}
private func removeEncoder(context: ChannelHandlerContext) {
context.pipeline.context(handlerType: HTTPResponseEncoder.self).whenSuccess {
context.pipeline.removeHandler(context: $0, promise: nil)
}
}
}