Option to make Connect Proxy work with HTTP traffic?

Oops. Can I just store them and once connection is finished send them out with the fireChannelRead or is more needed?

Do I need to wrap the .end in similar fashion like with .body?

You can just store them. You do need to wrap the .end as well: it likely seems fairly foolish, but I HTTPServerRequestPart.end and HTTPClientRequestPart.end are technically different types.

So here is what I came up with:

private var bufferedBody: ByteBuffer?
private var bufferedEnd: HTTPHeaders?

And modified guard:

guard case .head(var head) = self.unwrapInboundIn(data) else {
            let unwrapped = self.unwrapInboundIn(data)
            
            switch unwrapped {
            case .body(let buffer):
                
                switch state {
                case .connected:
                    context.fireChannelRead(self.wrapInboundOut(.body(.byteBuffer(buffer))))
                case .pendingConnection(_):
                    print("Buffering body")
                    self.bufferedBody = buffer
                default:
                    // shouldnt happen
                    break
                }
                
            case .end(let headers):
                switch state {
                case .connected:
                    context.fireChannelRead(self.wrapInboundOut(.end(headers)))
                case .pendingConnection(_):
                    print("Buffering end")
                    self.bufferedEnd = headers
                default:
                    // shouldnt happen
                    break
                }
                
            case .head(_):
                assertionFailure("Not possible")
                break
            }
            return
        }

And when glue method is done:

context.fireChannelRead(self.wrapInboundOut(.head(head)))
                
                if let bufferedBody = self.bufferedBody {
                    context.fireChannelRead(self.wrapInboundOut(.body(.byteBuffer(bufferedBody))))
                    self.bufferedBody = nil
                }
                
                if let bufferedEnd = self.bufferedEnd {
                    context.fireChannelRead(self.wrapInboundOut(.end(bufferedEnd)))
                    self.bufferedEnd = nil
                }

But even with these modifications it appears to be stuck still.

I suspect my "smart" completion handler for the glue method does not work. The original Connect proxy uses the removeHandler method to do the fireChannelRead but when I try to use it similarly, I get another fatal error:

Fatal error: tried to decode as type IOData but found HTTPPart<HTTPResponseHead, ByteBuffer>

What does your smart completion handler look like?

Let's leave the "smart" out :smiley:

My modified glue method looks like this:

private func glue(_ peerChannel: Channel, context: ChannelHandlerContext, onSuccess: @escaping (() -> Void)) {

        self.removeEncoder(context: context)

        // Now we need to glue our channel and the peer channel together.
        let (localGlue, peerGlue) = GlueHandler.matchedPair()
        context.channel.pipeline.addHandler(localGlue).and(peerChannel.pipeline.addHandler(peerGlue)).whenComplete { result in
            switch result {
            case .success(_):
                context.pipeline.removeHandler(self, promise: nil)
                onSuccess()
            case .failure(_):
                // Close connected peer channel before closing our channel.
                peerChannel.close(mode: .all, promise: nil)
                context.close(promise: nil)
            }
        }
    }

And its usage after connect is successful:

self.glue(channel, context: context) { [unowned self] in
            if case let .pendingConnection(head) = self.state {
                self.state = .connected

                context.fireChannelRead(self.wrapInboundOut(.head(head)))
                
                if let bufferedBody = self.bufferedBody {
                    context.fireChannelRead(self.wrapInboundOut(.body(.byteBuffer(bufferedBody))))
                    self.bufferedBody = nil
                }
                
                if let bufferedEnd = self.bufferedEnd {
                    context.fireChannelRead(self.wrapInboundOut(.end(bufferedEnd)))
                    self.bufferedEnd = nil
                }
            }
        }

This seems to fire, but I fear that there is a race condition. The original Connect proxy uses removeHandler method to do the fireChannelRead but that fatal errors in my case.

For reference, below is the connect proxy implementation:

func removeHandler(context: ChannelHandlerContext, removalToken: ChannelHandlerContext.RemovalToken) {
        var didRead = false

        // We are being removed, and need to deliver any pending bytes we may have if we're upgrading.
        while case .upgradeComplete(var pendingBytes) = self.upgradeState, pendingBytes.count > 0 {
            // Avoid a CoW while we pull some data out.
            self.upgradeState = .upgradeComplete(pendingBytes: [])
            let nextRead = pendingBytes.removeFirst()
            self.upgradeState = .upgradeComplete(pendingBytes: pendingBytes)

            context.fireChannelRead(nextRead)
            didRead = true
        }

        if didRead {
            context.fireChannelReadComplete()
        }

        print("Removing \(self) from pipeline")
        context.leavePipeline(removalToken: removalToken)
    }

I currently have empty one, because this did not work (with the code from my completion handler).

Ok great, so in your case I think you're probably not executing much of this code at all. In particular, because you're doing a removeHandler first, you're not in the pipeline when the completion handler fires and so all the data gets dropped when you call fireChannelRead. We have to actually address that fatal error you were hitting.

Is your proxy handler a ChannelDuplexHandler? If so, what's it's OutboundIn type?

Also I want to say sorry that this problem takes so long :disappointed_relieved:

It is ChannelInboundHandler based on the previously provided scaffolding. Really hope I don't have to make this duplex, because that sounds even more complicated.

I will modify my handler to use the removeHandler method to send the data and report back what it does.

Okay, so I removed that completion handler, and implemented removeHandler like this:

func removeHandler(context: ChannelHandlerContext, removalToken: ChannelHandlerContext.RemovalToken) {

        if case let .pendingConnection(head) = self.state {
            self.state = .connected

            context.fireChannelRead(self.wrapInboundOut(.head(head)))
            
            if let bufferedBody = self.bufferedBody {
                context.fireChannelRead(self.wrapInboundOut(.body(.byteBuffer(bufferedBody))))
                self.bufferedBody = nil
            }
            
            if let bufferedEnd = self.bufferedEnd {
                context.fireChannelRead(self.wrapInboundOut(.end(bufferedEnd)))
                self.bufferedEnd = nil
            }
            
            context.fireChannelReadComplete() // fatal error
        }
        
        context.leavePipeline(removalToken: removalToken)
    }

If I have the fireChannelReadComplete I am getting fatal error:

Fatal error: tried to decode as type IOData but found HTTPPart<HTTPResponseHead, ByteBuffer>

Without it, nothing happens. Same "stuck" state as before.

I was wondering how complicated that would be in Macro.swift (a thin wrapper around NIO) and hacked up this thing: httpproxy.swift
Disclaimer: Doing this kind of stuff directly in NIO is faster for sure. Also having an httpproxy as a sample project would be quite nice.

1 Like

Wow :open_mouth: Had no idea option like this exists. My use case is quite performance sensitive but I expect most traffic to be HTTPS and thus go through the SwiftNIO route.

This looks like great backup plan if I don't manage to finish my HTTPProxyHandler. Thanks for the example!

Yes, a proxy is a prime example of an app which should actually be done in SwiftNIO itself.

I have tried this proxy you implemented, and for some stuff it works (like images from http CDN), but when I try to access http:// in Safari I see white page and following error in the console:

Task <9045CD11-EF0E-4C87-BC6F-FB22D1D61F19>.<259> finished with error [-1007] Error Domain=NSURLErrorDomain Code=-1007 "too many HTTP redirects" UserInfo={NSLocalizedDescription=too many HTTP redirects, NSErrorFailingURLStringKey=, NSErrorFailingURLKey=, _NSURLErrorRelatedURLSessionTaskErrorKey=, _NSURLErrorFailingURLSessionTaskErrorKey=, NSUnderlyingError=0x147e3fe20 {Error Domain=kCFErrorDomainCFNetwork Code=-1007}}

Withou the proxy, I either get redirected to HTTPS variant or see the actual webpage with "Not Secure" in the address bar

The agent here is using URLSession, so you are probably just cycling there. Also it does not support CONNECT, it is just a plain HTTP proxy.

Yea, so in a case where server does redirect HTTP -> HTTPS it does not work, but it does not seem to work for plain HTTP websites also where no redirect is happening

Ok great, and you’re executing this after the GlueHandler is added, yes? What other handlers are in the partner pipeline?

Yes, after GlueHandler is successfully added, it calls the context.pipeline.removeHandler(self, promise: nil) as in the Connect proxy example.

By "partner pipeline" you mean the server bootstrap?

Below if my current code, in case you want to check out the sequence of calls:

final class HTTPConnectHandler: ChannelInboundHandler, RemovableChannelHandler {
    func removeHandler(context: ChannelHandlerContext, removalToken: ChannelHandlerContext.RemovalToken) {

        if case let .pendingConnection(head) = self.state {
            self.state = .connected

            context.fireChannelRead(self.wrapInboundOut(.head(head)))
            
            if let bufferedBody = self.bufferedBody {
                context.fireChannelRead(self.wrapInboundOut(.body(.byteBuffer(bufferedBody))))
                self.bufferedBody = nil
            }
            
            if let bufferedEnd = self.bufferedEnd {
                context.fireChannelRead(self.wrapInboundOut(.end(bufferedEnd)))
                self.bufferedEnd = nil
            }
            
            context.fireChannelReadComplete()
        }
        
        context.leavePipeline(removalToken: removalToken)
    }
    
    enum State {
        case idle
        case pendingConnection(head: HTTPRequestHead)
        case connected
    }
    
    enum ConnectError: Error {
        case invalidURL
        case wrongScheme
        case wrongHost
    }
    
    typealias InboundIn = HTTPServerRequestPart
    typealias InboundOut = HTTPClientRequestPart
    
    private var state = State.idle
    
    private var bufferedBody: ByteBuffer?
    private var bufferedEnd: HTTPHeaders?
    
    func channelRead(context: ChannelHandlerContext, data: NIOAny) {
        
        guard case .head(var head) = self.unwrapInboundIn(data) else {
            let unwrapped = self.unwrapInboundIn(data)
            
            switch unwrapped {
            case .body(let buffer):
                
                switch state {
                case .connected:
                    context.fireChannelRead(self.wrapInboundOut(.body(.byteBuffer(buffer))))
                case .pendingConnection(_):
                    print("Buffering body")
                    self.bufferedBody = buffer
                default:
                    // shouldnt happen
                    break
                }
                
            case .end(let headers):
                switch state {
                case .connected:
                    context.fireChannelRead(self.wrapInboundOut(.end(headers)))
                case .pendingConnection(_):
                    print("Buffering end")
                    self.bufferedEnd = headers
                default:
                    // shouldnt happen
                    break
                }
                
            case .head(_):
                assertionFailure("Not possible")
                break
            }
            return
        }
        
        os_log(.default, log: .default, "Connecting to URI: %{public}s", head.uri as NSString)
        
        guard let parsedUrl = URL(string: head.uri) else {
            context.fireErrorCaught(ConnectError.invalidURL)
            return
        }
        
        os_log(.default, log: .default, "Parsed scheme: %{public}s", (parsedUrl.scheme ?? "no scheme") as NSString)
        
        guard parsedUrl.scheme == "http" else {
            context.fireErrorCaught(ConnectError.wrongScheme)
            return
        }
        
        guard let host = head.headers.first(name: "Host"), host == parsedUrl.host else {
            os_log(.default, log: .default, "Wrong host")
            context.fireErrorCaught(ConnectError.wrongHost)
            return
        }
        
        var targetUrl = parsedUrl.path
        
        if let query = parsedUrl.query {
            targetUrl += "?\(query)"
        }
        
        head.uri = targetUrl
        
        switch state {
        case .idle:
            state = .pendingConnection(head: head)
            connectTo(host: host, port: 80, context: context)
        case .pendingConnection(_):
            os_log(.default, log: .default, "Logic error fireChannelRead with incorrect state")
            
        case .connected:
            context.fireChannelRead(self.wrapInboundOut(.head(head)))
        }
        
    }
    
    private func connectTo(host: String, port: Int, context: ChannelHandlerContext) {
        
        let channelFuture = ClientBootstrap(group: context.eventLoop)
            .channelInitializer { channel in
                channel.pipeline.addHandler(HTTPRequestEncoder()).flatMap {
                    channel.pipeline.addHandler(ByteToMessageHandler(HTTPResponseDecoder(leftOverBytesStrategy: .forwardBytes)))
                }
            }
            .connect(host: host, port: port)
        
        

        channelFuture.whenSuccess { channel in
            self.connectSucceeded(channel: channel, context: context)
        }
        channelFuture.whenFailure { error in
            self.connectFailed(error: error, context: context)
        }
    }
    
    private func connectSucceeded(channel: Channel, context: ChannelHandlerContext) {
        os_log(.default, log: .default, "Connect succeeded")
        
        self.glue(channel, context: context)
    }

    private func connectFailed(error: Error, context: ChannelHandlerContext) {
        os_log(.error, log: .default, "Connect failed: %@", error as NSError)
        context.fireErrorCaught(error)
    }
    
    private func glue(_ peerChannel: Channel, context: ChannelHandlerContext) {

        self.removeEncoder(context: context)

        // Now we need to glue our channel and the peer channel together.
        let (localGlue, peerGlue) = GlueHandler.matchedPair()
        context.channel.pipeline.addHandler(localGlue).and(peerChannel.pipeline.addHandler(peerGlue)).whenComplete { result in
            switch result {
            case .success(_):
                context.pipeline.removeHandler(self, promise: nil)
            case .failure(_):
                // Close connected peer channel before closing our channel.
                peerChannel.close(mode: .all, promise: nil)
                context.close(promise: nil)
            }
        }
    }
    
    private func removeEncoder(context: ChannelHandlerContext) {
        context.pipeline.context(handlerType: HTTPResponseEncoder.self).whenSuccess {
            context.pipeline.removeHandler(context: $0, promise: nil)
        }
    }
    
}

Great, don’t remove the HTTPResponseEncoder in private func removeEncoder.

Relatedly I’m afraid you’re going to need to make this a duplex handler. Your OutboundIn will be HTTPClientResponsePart and your OutboundOut will be HTTPServerResponsePart, and then you’ll need a func write implementation like this:

func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
    switch self.unwrapOutboundIn(data) {
    case .head(let head):
        context.write(self.wrapOutboundOut(.head(head)), promise: nil)
    case .body(let body):
        context.write(self.wrapOutboundOut(.body(.byteBuffer(body)), promise: nil)
    case .end(let trailers):
        context.write(self.wrapOutboundOut(.end(trailers)), promise: nil)
    }
}

Okay, thanks. Will try. What about the issue with HTTP -> HTTPS upgrade based on server redirect that I discovered while trying the example by @Helge_Hess1?

Will it be doable? Because I am afraid otherwise I will still have the same problem in most cases :(

Ok, changed to ChannelDuplexHandler and removed removing of the decoder.

But now fatal error is back, it happens in the GlueHandler:

func channelRead(context: ChannelHandlerContext, data: NIOAny) {
        self.partner?.partnerWrite(data)
    }

Fatal error: tried to decode as type HTTPPart<HTTPResponseHead, IOData> but found HTTPPart<HTTPResponseHead, ByteBuffer>

Based on the headers it appears like I successfully got a response from the server. It looks like I need to modify the client pipeline, but I am not sure what is the correct order? I tried adding encoders/decoders from the server pipeline also, but got different fatal error.

Currently the pipeline looks like this:

let channelFuture = ClientBootstrap(group: context.eventLoop)
            .channelInitializer { channel in
                channel.pipeline.addHandler(HTTPRequestEncoder()).flatMap {
                    channel.pipeline.addHandler(ByteToMessageHandler(HTTPResponseDecoder(leftOverBytesStrategy: .forwardBytes)))
                }
            }
            .connect(host: host, port: port)

A redirect is just a redirect. I don't know what the intention of your proxy is, but for tunnelling https you likely just need to use the original CONNECT proxy :slight_smile: (it's not hard to add to my sample, upstream I just disabled CONNECT to avoid confusion about what it does)