2021-08-25 - Jack: One Thread Per Core

I have being making progress with Babelia on the web user interface, IRC interface, and also the backend.

Regarding the backend, even if Chez Scheme is fast, it is clear even on the small dataset I have put together, that is around eleven gigabytes without compression. I need something like map-reduce [1], in LISP world known under the name of for-each-parallel-map.

In full-text search, and in a search product like Google, they are tips, and tricks to avoid to hit the worst case. The worst being a query where the least frequent word is also one of the most frequent in the index. Possible workarounds include 0) using AND as the default operator 1) eliminating most common word (also known as stop-words); 2) caching results; 3) approximating results with user profiling 4) editorializing (prolly and also somewhat algorithmically e.g. first result is always the same language wikipedia article associated with the wikidata longest alias in the user query)...

All those workarounds give rise to other problems, or they need a lot of work such as profiling users, which is in my opinion not a problem when that is limited to profiling users' data that are published in the open (unlike tracking search users via their queries, or mail, or via people graph ... (There is a way to implement full-text-search queries, multi-criteria multi-valued sorted queries that encrypted server side).

Anyway, threads are difficult, so I wanted to give it try. The above is trying to explain from where my motivation stems from.

It is still unclear whether make-jack works reliably all the time. You tell me.

(make-jack count) → procedure?

make-jack initialize a pool of COUNT parallel threads, and return a possibly endless generator that produces jacks. A jack is made of two procedure:

  1. The first procedure is an accumulator that will consume one or more thunks. That is how the user request the parallel execution of something.

  2. The second procedure will generate the results of the thunks submitted with the associated accumulator in an unspecified order.


(call-with-values jack
   (lambda (consumer producer)
     ;; submit some work to the pool, the consumer will block
     ;; if there is too much work already scheduled.
     (consumer thunks)
     ;; pull results
     (let loop ((count (length input)))
       (unless (fxzero? count)
         ;; producer will block the current thread until there
     ;; is something to produce
         (display (producer))
         (loop (fx- count 1))))))

Here are some numbers that backup the claim that it may work as expected, the tests were done with pool of size five. The work, called THUNKS in the above snippet, is the computation of one thousand times fibonacci of 40:

(time (call-with-values jack ...))
    no collections
    0.029955076s elapsed cpu time
    152.646622367s elapsed real time
    592688 bytes allocated
        Command being timed: "scheme --libdirs src/ --program main.scm"
        User time (seconds): 761.84
        System time (seconds): 0.04
        Percent of CPU this job got: 498%
        Elapsed (wall clock) time (h:mm:ss or m:ss): 2:32.71
        Average shared text size (kbytes): 0
        Average unshared data size (kbytes): 0
        Average stack size (kbytes): 0
        Average total size (kbytes): 0
        Maximum resident set size (kbytes): 49624
        Average resident set size (kbytes): 0
        Major (requiring I/O) page faults: 0
        Minor (reclaiming a frame) page faults: 14194
        Voluntary context switches: 1011
        Involuntary context switches: 3646
        Swaps: 0
        File system inputs: 0
        File system outputs: 0
        Socket messages sent: 0
        Socket messages received: 0
        Signals delivered: 0
        Page size (bytes): 4096
        Exit status: 0

The code:

(define-record-type* <queue>
    ;; This is a fifo queue, that can signal when items are available
    ;; or space is available. Space is available when there is less
    ;; than MARK items inside the REST. It is used in jacks in both
    ;; accumulators and generators.
    (make-queue% name head rest mark mutex item-available space-available)
    (name queue-name)
    (head queue-head queue-head!)
    (rest queue-rest queue-rest!)
    ;; mark is an integer that allows to keep the number of produced,
    ;; and accumulated values low; Tho, there is room for starvation.
    (mark queue-mark)
    (mutex queue-mutex)
    (item-available queue-item-available)
    (space-available queue-space-available))

  (define (make-queue name mark)
    (make-queue% name '() '() mark (make-mutex) (make-condition) (make-condition)))

  (define (queue-pop! queue)
    (mutex-acquire (queue-mutex queue))
    (if (null? (queue-head queue))
        (if (null? (queue-rest queue))
              ;; Wait for work...
              (condition-wait (queue-item-available queue) (queue-mutex queue))
              (mutex-release (queue-mutex queue))
              ;; recurse
              (queue-pop! queue))
            (let* ((item+new-head (reverse (queue-rest queue)))
                   (item (car item+new-head))
                   (new-head (cdr item+new-head)))
              ;; There is nothing in the head, but the rest has stuff
              ;; reverse the rest to keep it FIFO, and replace the
              ;; HEAD with it. Return the first item immediatly to
              ;; avoid to pressure the mutex, and best performance.
              (queue-rest! queue '())
              (condition-signal (queue-space-available queue))
              (queue-head! queue new-head)
              (mutex-release (queue-mutex queue))
        ;; There is work, all is well.
        (let ((item (car (queue-head queue)))
              (new-head (cdr (queue-head queue))))
          (queue-head! queue new-head)
          (mutex-release (queue-mutex queue))

  (define (queue-push! queue . items)
    (mutex-acquire (queue-mutex queue))
    ;; we only check that the rest is less than the mark. BUT the user
    ;; may append more than mark ITEMS.
    (if (fx<? (length (queue-rest queue)) (queue-mark queue))
          (queue-rest! queue (append items (queue-rest queue)))
          ;; TODO: It may not be necessary to wake up all waiting
          ;; threads, but only (length (queue-rest queue))?
          (condition-broadcast (queue-item-available queue))
          (mutex-release (queue-mutex queue)))
          ;; Block until some work is done.
          (condition-wait (queue-space-available queue) (queue-mutex queue))
          (mutex-release (queue-mutex queue))
          ;; TODO: here it is possible to append the items without
          ;; recursing.
          (apply queue-push! queue items))))

  (define (make-jack count)

    ;; That is the queue for all work for the created thread pool.
    ;; The mark is an arbitrary number, it could be an argument.
    (define input (make-queue 'input (fx* count 2)))

    (define (worker-loop index input)
      ;; TODO: replace thunk+output with output+thunk, and avoid the
      ;; cdr before the car.
      (let ((thunk+output (queue-pop! input)))
        (queue-push! (cdr thunk+output) ((car thunk+output)))
        (worker-loop index input)))

    (define (worker-init count)
      (let loop ((count count))
        (unless (fxzero? count)
          ;; count is passed as the thread index for debugging
          ;; purpose
          (fork-thread (lambda () (worker-loop count input)))
          (loop (fx- count 1)))))

    (define (input-consumer input output)
      (lambda (thunks)
        ;; TODO: avoid the call to apply. The reverse is necessary, to
        ;; keep around the priority information FIFO.
        (apply queue-push! input (reverse (map (lambda (thunk) (cons thunk output)) thunks)))))

    (define (output-producer output)
      (lambda ()
        (queue-pop! output)))

    ;; Initialize thread pool at call site, that is somewhat unusual
    ;; for a generator to have side-effects outside producing values.
    (worker-init count)

    (lambda ()
      ;; again the mark is a clueless guess.
      (define output (make-queue 'output (fx* count 2)))
      (values (input-consumer input output) (output-producer output))))


  1. MapReduce: Simplified Data Processing on Large Clusters