How to effectively filter array by another array? ( In functional paradigm )

Consider following task.

Let A be a list of Comparable items. Let B be a list of Comparable items. Entries in these lists can be compared.

Output is a list of entries in A that are not in the list B.

Two conditions.

  1. Assume A and B are sorted.
  2. Count of the same entries are considered as difference of count in A and count in B. Output duplicates respectively to this difference if it is greater than zero. Otherwise, skip them.

Example:
A = [1, 1, 1, 2, 4, 7]
B = [1, 2, 2, 4, 6]

Output:
[1, 1, 7]

Description:
A has three 1, one 2.
B has one 1, two 2.

Result has three - one 1 and one - two 2.
Result has two 1 and (-1) 2.
Result has two 1.

Functional condition:

  1. Do it without loops.

Try to solve this task for different count ( 900 - 100_000 )

I am asking for effective (tail call optimized) solution.
Is it possible?

func generateArray(count: UInt = 900, range: UInt = 1000) -> [UInt] {
    return Array(0..<count).map{ _ in return UInt.random(in: 0...range) }
}

func test_ListsDifference() {
    let count = 10_000
    let a = generateArray(count: count).sorted()
    let b = generateArray(count: count).sorted()
    
    var result = [UInt]()
    self.measure {
        result = rejecting(a, b)
    }
}

My dirty solution so far

func rejecting(_ a: ArraySlice<T>, _ b: ArraySlice<T>) -> ArraySlice<T> {
  func _rejecting(_ a: ArraySlice<T>, _ b: ArraySlice<T>, aStart: ArraySlice<T>.Index, bStart: ArraySlice<T>.Index) -> ArraySlice<T> {
    if aStart == a.endIndex {
      return []
    }
    else if bStart == b.endIndex {
      return a[aStart..<a.endIndex]
    }
    else if a[aStart] == b[bStart] {
      return _rejecting(a, b, aStart: aStart.advanced(by: 1), bStart: bStart.advanced(by: 1))
    }
    else if a[aStart] > b[bStart] {
      return _rejecting(a, b, aStart: aStart, bStart: bStart.advanced(by: 1))
    }
    else {
      return _rejecting(a, b, aStart: aStart.advanced(by: 1), bStart: bStart)
    }
  }
  return _rejecting(a, b, aStart: a.startIndex, bStart: b.startIndex)
}

Your code doesn’t work so I just did some clean up:

func rejecting<T>(_ a: [T], _ b: [T]) -> [T] where T: Comparable {
    func _rejecting(_ a: ArraySlice<T>, _ b: ArraySlice<T>) -> [T] {
        guard let firstA = a.first,
            let firstB = b.first else {
                return Array(a)
        }
        if firstA < firstB {
            return [firstA] + _rejecting(a.dropFirst(), b)
        }
        if firstA > firstB {
            return _rejecting(a, b.dropFirst())
        }
        return _rejecting(a.dropFirst(), b.dropFirst())
    }
    return _rejecting(a[...], b[...])
}

Though I don’t know if Swift already support tco.

Your example is not tco at all.
How can compiler optimize it in case of this branch?

if firstA < firstB {
    return [firstA] + _rejecting(a.dropFirst(), b)
}

My idea was ( and is ) keep slices immutable. ( and also use accumulator for result for TCO ).

Consider next enhancement.

func rejecting(_ a: ArraySlice<T>, _ b: ArraySlice<T>) -> ArraySlice<T> {
  func _rejecting(_ a: inout ArraySlice<T>, _ b: inout ArraySlice<T>, aStart: ArraySlice<T>.Index, bStart: ArraySlice<T>.Index, result: [T]) -> [T] {
    if aStart == a.endIndex || bStart == b.endIndex {
        return result.appending(a[aStart...])
    }
    return _rejecting(&a, &b, aStart: aStart.advanced(by: a[aStart] <= b[bStart] ? 1 : 0), bStart: bStart.advanced(by: a[aStart] >= b[bStart] ? 1 : 0), result: a[aStart] < b[bStart] ? result.appending(a[aStart]) : result )
  }
  var inoutA = a
  var inoutB = b
  return ArraySlice(_rejecting(&inoutA, &inoutB, aStart: a.startIndex, bStart: b.startIndex, result: []))
}

And related extension on Array.

extension Array where Element: Comparable {
  func appending(_ a: Element) -> Array {
    return self + [a]
  }
  func appending(_ s: [Element]) -> Array {
    return self + s
  }
  func appending(_ as: ArraySlice<Element>) -> Array {
    return self + `as`
  }
}

The name for this operation is “symmetric difference”.

Edit: My mistake, it is subtraction. Thanks @mayoff

The symmetric difference would include elements of B that are not in A. The algorithm described by lolgear does not include such elements.

1 Like

Your demand is that the function be written recursively, but execute iteratively (to handle large inputs on real hardware), by way of tail call optimization. Your demand has a fundamental problem.

The problem is that Swift is not Scheme. Scheme guarantees tail call optimization, but Swift does not. Tail call optimization that's not guaranteed to occur is not a programming model.

Since Swift uses automatic reference counting, it's often the case that Swift has to generate some clean-up reference count adjustments after a call that appears to be in tail position. These adjustments prevent the apparent tail call from being optimized.

Even if you play around for a while and discover a way to implement the algorithm recursively that Swift then compiles with tail call optimization, it's dangerous to rely on it. Slight changes in a future version of the compiler might no longer optimize the tail calls in your carefully-tuned function.

3 Likes

Yea yea, I was a little light headed when I was writing it, my bad :stuck_out_tongue:.

Just want to point out that you forgot accumulator, which in any case added in your next iteration.

1 Like

This, by the way, was my attempt at getting Swift to apply tail-call optimization, but it still wouldn't do it:

extension Collection where Element: Comparable {
    func removing<C2: Collection>(contentsOf you: C2) -> [Element] where C2.Element == Element {
        var output: [Element] = []

        func remove(mine: Index, yours: C2.Index) {
            if mine == endIndex {
                // I have no more elements. There's no need to look at the rest of yours.
                return
            }

            else if yours == you.endIndex {
                // You have no more elements. All the rest of mine are included.
                return output.append(contentsOf: self[mine...])
            }

            else if self[mine] < you[yours] {
                output.append(self[mine])
                return remove(mine: self.index(after: mine), yours: yours)
            }

            else if you[yours] < self[mine] {
                return remove(mine: mine, yours: you.index(after: yours))
            }

            else /* self[mine] == you[yours] */ {
                return remove(mine: self.index(after: mine), yours: you.index(after: yours))
            }
        }

        remove(mine: self.startIndex, yours: you.startIndex)
        return output
    }
}

It ends up with non-optimized recursive calls that look like this:

    0x100001af4 <+260>: movzx  esi, al
    0x100001af7 <+263>: mov    rdi, r15
    0x100001afa <+266>: mov    rdx, rbx
    0x100001afd <+269>: mov    r8, r14
    0x100001b00 <+272>: mov    r9, r12
    0x100001b03 <+275>: call   0x1000019f0               ; <+0> at main.swift:36
->  0x100001b08 <+280>: add    rsp, 0x48
    0x100001b0c <+284>: pop    rbx
    0x100001b0d <+285>: pop    r12
    0x100001b0f <+287>: pop    r13
    0x100001b11 <+289>: pop    r14
    0x100001b13 <+291>: pop    r15
    0x100001b15 <+293>: pop    rbp
    0x100001b16 <+294>: ret    

The only cleanup being done after the recursive call is restoring rsp and callee-saved registers. No arguments are passed on the stack. As far as I can tell, the cleanup could be done before the recursive call, but the compiler isn't smart enough to do it.

Since you are extracting elements from the source and filter each one at a time and always in order, it seemed that the iterator interface would be perfect. Instead of putting it here, I put my version in a gist. But I'll copy the business method here:

extension SetSubtractionIterator: IteratorProtocol {

    public mutating func next() -> Base.Element? {
        // Can't return wrapped element if there's no more.
        lastBase = lastBase ?? base.next()
        guard let latestBase = lastBase else { return nil }

        /*
         Compare latest from base and filtered:

         - filtered is NIL: return base and increment it
         - base < filtered: return base and increment it
         - base == filtered: increment both then recursive call
         - base > filtered: increment filter then recursive call
         */
        do {
            let incrementBase: Bool
            defer {
                if incrementBase {
                    lastBase = base.next()
                    if let nextBase = lastBase, nextBase < latestBase {
                        preconditionFailure("To-be-filtered sequence isn't actually sorted")
                    }
                }
            }

            lastFilter = lastFilter ?? filters.next()
            guard let latestFilter = lastFilter, latestBase >= latestFilter else {
                incrementBase = true
                return latestBase
            }
            defer {
                lastFilter = filters.next()
                if let nextFiltered = lastFilter, nextFiltered < latestFilter {
                    preconditionFailure("Filtering sequence isn't actually sorted")
                }
            }

            incrementBase = latestBase == latestFilter
        }
        return next()
    }

}

I had to put the bulk of the code in a do-block, otherwise I'll get infinite recursion. Could someone check if it generates the sought-after tail-recursion?

Since I qualified the array requirements as small as possible, this code will even work with single-pass Sequences. It helps here that you require <; using a general predicate would change how the comparisons are done, need to check for exceptions, and probably need eager/lazy versions. Maybe I'll try that later; it'd need Swift 5's Result to generalize it to one copy of the algorithm.

Oh, I forgot examples:

let x = [1, 1, 1, 2, 4, 7]
Array(x.subtract(sortedFilterElements: [1, 2, 2, 4, 6]))  // [1, 1, 7]
Array(x.subtract(sortedFilterElements: [1, 4, 7]))  // [1, 1, 2]
Array(x.subtract(sortedFilterElements: EmptyCollection()))  // [1, 1, 1, 2, 4, 7]
Array(EmptyCollection().subtract(sortedFilterElements: [1, 2, 2, 4, 6]))  // []

I updated the gist with a new implementation in a different file. I tried putting all to core functionality in a single function, but I couldn't do that for the Collection adaptation. If any of you want to see if the implementations are loop-less-ly recursive, you need to check both FullSetSubtractionImplementationIterator.next() and LazyPresortedSubtractionCollection.nextIndex(baseStart: filterStart:).

I tried to make a BidirectionalCollection extension, but trying to figure out the synchronization between the main and filter collections in reverse is too hard. I don't feel like taking a couple weeks figuring it out. From what I tried, I think it's most likely possible, though. (In forward iteration, endIndex can be a general guess, but I reimplemented it in the BC extension to get the precise answer since I assume such an answer during index(before:). Note said re-extension is linear instead of constant, just like startIndex.)