How to effectively filter array by another array? ( In functional paradigm )


(Dmitry Lobanov) #1

Consider following task.

Let A be a list of Comparable items. Let B be a list of Comparable items. Entries in these lists can be compared.

Output is a list of entries in A that are not in the list B.

Two conditions.

  1. Assume A and B are sorted.
  2. Count of the same entries are considered as difference of count in A and count in B. Output duplicates respectively to this difference if it is greater than zero. Otherwise, skip them.

Example:
A = [1, 1, 1, 2, 4, 7]
B = [1, 2, 2, 4, 6]

Output:
[1, 1, 7]

Description:
A has three 1, one 2.
B has one 1, two 2.

Result has three - one 1 and one - two 2.
Result has two 1 and (-1) 2.
Result has two 1.

Functional condition:

  1. Do it without loops.

Try to solve this task for different count ( 900 - 100_000 )

I am asking for effective (tail call optimized) solution.
Is it possible?

func generateArray(count: UInt = 900, range: UInt = 1000) -> [UInt] {
    return Array(0..<count).map{ _ in return UInt.random(in: 0...range) }
}

func test_ListsDifference() {
    let count = 10_000
    let a = generateArray(count: count).sorted()
    let b = generateArray(count: count).sorted()
    
    var result = [UInt]()
    self.measure {
        result = rejecting(a, b)
    }
}

My dirty solution so far

func rejecting(_ a: ArraySlice<T>, _ b: ArraySlice<T>) -> ArraySlice<T> {
  func _rejecting(_ a: ArraySlice<T>, _ b: ArraySlice<T>, aStart: ArraySlice<T>.Index, bStart: ArraySlice<T>.Index) -> ArraySlice<T> {
    if aStart == a.endIndex {
      return []
    }
    else if bStart == b.endIndex {
      return a[aStart..<a.endIndex]
    }
    else if a[aStart] == b[bStart] {
      return _rejecting(a, b, aStart: aStart.advanced(by: 1), bStart: bStart.advanced(by: 1))
    }
    else if a[aStart] > b[bStart] {
      return _rejecting(a, b, aStart: aStart, bStart: bStart.advanced(by: 1))
    }
    else {
      return _rejecting(a, b, aStart: aStart.advanced(by: 1), bStart: bStart)
    }
  }
  return _rejecting(a, b, aStart: a.startIndex, bStart: b.startIndex)
}

#2

Your code doesn’t work so I just did some clean up:

func rejecting<T>(_ a: [T], _ b: [T]) -> [T] where T: Comparable {
    func _rejecting(_ a: ArraySlice<T>, _ b: ArraySlice<T>) -> [T] {
        guard let firstA = a.first,
            let firstB = b.first else {
                return Array(a)
        }
        if firstA < firstB {
            return [firstA] + _rejecting(a.dropFirst(), b)
        }
        if firstA > firstB {
            return _rejecting(a, b.dropFirst())
        }
        return _rejecting(a.dropFirst(), b.dropFirst())
    }
    return _rejecting(a[...], b[...])
}

Though I don’t know if Swift already support tco.


(Dmitry Lobanov) #3

Your example is not tco at all.
How can compiler optimize it in case of this branch?

if firstA < firstB {
    return [firstA] + _rejecting(a.dropFirst(), b)
}

My idea was ( and is ) keep slices immutable. ( and also use accumulator for result for TCO ).

Consider next enhancement.

func rejecting(_ a: ArraySlice<T>, _ b: ArraySlice<T>) -> ArraySlice<T> {
  func _rejecting(_ a: inout ArraySlice<T>, _ b: inout ArraySlice<T>, aStart: ArraySlice<T>.Index, bStart: ArraySlice<T>.Index, result: [T]) -> [T] {
    if aStart == a.endIndex || bStart == b.endIndex {
        return result.appending(a[aStart...])
    }
    return _rejecting(&a, &b, aStart: aStart.advanced(by: a[aStart] <= b[bStart] ? 1 : 0), bStart: bStart.advanced(by: a[aStart] >= b[bStart] ? 1 : 0), result: a[aStart] < b[bStart] ? result.appending(a[aStart]) : result )
  }
  var inoutA = a
  var inoutB = b
  return ArraySlice(_rejecting(&inoutA, &inoutB, aStart: a.startIndex, bStart: b.startIndex, result: []))
}

And related extension on Array.

extension Array where Element: Comparable {
  func appending(_ a: Element) -> Array {
    return self + [a]
  }
  func appending(_ s: [Element]) -> Array {
    return self + s
  }
  func appending(_ as: ArraySlice<Element>) -> Array {
    return self + `as`
  }
}

#4

The name for this operation is “symmetric difference”.

Edit: My mistake, it is subtraction. Thanks @mayoff


(Rob Mayoff) #5

The symmetric difference would include elements of B that are not in A. The algorithm described by lolgear does not include such elements.


(Rob Mayoff) #6

Your demand is that the function be written recursively, but execute iteratively (to handle large inputs on real hardware), by way of tail call optimization. Your demand has a fundamental problem.

The problem is that Swift is not Scheme. Scheme guarantees tail call optimization, but Swift does not. Tail call optimization that's not guaranteed to occur is not a programming model.

Since Swift uses automatic reference counting, it's often the case that Swift has to generate some clean-up reference count adjustments after a call that appears to be in tail position. These adjustments prevent the apparent tail call from being optimized.

Even if you play around for a while and discover a way to implement the algorithm recursively that Swift then compiles with tail call optimization, it's dangerous to rely on it. Slight changes in a future version of the compiler might no longer optimize the tail calls in your carefully-tuned function.


#7

Yea yea, I was a little light headed when I was writing it, my bad :stuck_out_tongue:.

Just want to point out that you forgot accumulator, which in any case added in your next iteration.


(Rob Mayoff) #8

This, by the way, was my attempt at getting Swift to apply tail-call optimization, but it still wouldn't do it:

extension Collection where Element: Comparable {
    func removing<C2: Collection>(contentsOf you: C2) -> [Element] where C2.Element == Element {
        var output: [Element] = []

        func remove(mine: Index, yours: C2.Index) {
            if mine == endIndex {
                // I have no more elements. There's no need to look at the rest of yours.
                return
            }

            else if yours == you.endIndex {
                // You have no more elements. All the rest of mine are included.
                return output.append(contentsOf: self[mine...])
            }

            else if self[mine] < you[yours] {
                output.append(self[mine])
                return remove(mine: self.index(after: mine), yours: yours)
            }

            else if you[yours] < self[mine] {
                return remove(mine: mine, yours: you.index(after: yours))
            }

            else /* self[mine] == you[yours] */ {
                return remove(mine: self.index(after: mine), yours: you.index(after: yours))
            }
        }

        remove(mine: self.startIndex, yours: you.startIndex)
        return output
    }
}

It ends up with non-optimized recursive calls that look like this:

    0x100001af4 <+260>: movzx  esi, al
    0x100001af7 <+263>: mov    rdi, r15
    0x100001afa <+266>: mov    rdx, rbx
    0x100001afd <+269>: mov    r8, r14
    0x100001b00 <+272>: mov    r9, r12
    0x100001b03 <+275>: call   0x1000019f0               ; <+0> at main.swift:36
->  0x100001b08 <+280>: add    rsp, 0x48
    0x100001b0c <+284>: pop    rbx
    0x100001b0d <+285>: pop    r12
    0x100001b0f <+287>: pop    r13
    0x100001b11 <+289>: pop    r14
    0x100001b13 <+291>: pop    r15
    0x100001b15 <+293>: pop    rbp
    0x100001b16 <+294>: ret    

The only cleanup being done after the recursive call is restoring rsp and callee-saved registers. No arguments are passed on the stack. As far as I can tell, the cleanup could be done before the recursive call, but the compiler isn't smart enough to do it.


(Daryle Walker) #9

Since you are extracting elements from the source and filter each one at a time and always in order, it seemed that the iterator interface would be perfect. Instead of putting it here, I put my version in a gist. But I'll copy the business method here:

extension SetSubtractionIterator: IteratorProtocol {

    public mutating func next() -> Base.Element? {
        // Can't return wrapped element if there's no more.
        lastBase = lastBase ?? base.next()
        guard let latestBase = lastBase else { return nil }

        /*
         Compare latest from base and filtered:

         - filtered is NIL: return base and increment it
         - base < filtered: return base and increment it
         - base == filtered: increment both then recursive call
         - base > filtered: increment filter then recursive call
         */
        do {
            let incrementBase: Bool
            defer {
                if incrementBase {
                    lastBase = base.next()
                    if let nextBase = lastBase, nextBase < latestBase {
                        preconditionFailure("To-be-filtered sequence isn't actually sorted")
                    }
                }
            }

            lastFilter = lastFilter ?? filters.next()
            guard let latestFilter = lastFilter, latestBase >= latestFilter else {
                incrementBase = true
                return latestBase
            }
            defer {
                lastFilter = filters.next()
                if let nextFiltered = lastFilter, nextFiltered < latestFilter {
                    preconditionFailure("Filtering sequence isn't actually sorted")
                }
            }

            incrementBase = latestBase == latestFilter
        }
        return next()
    }

}

I had to put the bulk of the code in a do-block, otherwise I'll get infinite recursion. Could someone check if it generates the sought-after tail-recursion?

Since I qualified the array requirements as small as possible, this code will even work with single-pass Sequences. It helps here that you require <; using a general predicate would change how the comparisons are done, need to check for exceptions, and probably need eager/lazy versions. Maybe I'll try that later; it'd need Swift 5's Result to generalize it to one copy of the algorithm.


How is defer timed with recursive calls?
Are we close to sharing implementation between throwing and non-throwing predicates?
(Daryle Walker) #10

Oh, I forgot examples:

let x = [1, 1, 1, 2, 4, 7]
Array(x.subtract(sortedFilterElements: [1, 2, 2, 4, 6]))  // [1, 1, 7]
Array(x.subtract(sortedFilterElements: [1, 4, 7]))  // [1, 1, 2]
Array(x.subtract(sortedFilterElements: EmptyCollection()))  // [1, 1, 1, 2, 4, 7]
Array(EmptyCollection().subtract(sortedFilterElements: [1, 2, 2, 4, 6]))  // []