All files PaginationStream.ts

94.73% Statements 36/38
95.45% Branches 21/22
100% Functions 3/3
94.73% Lines 36/38

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 621x                     1x 1x   8x   8x   8x   8x   1x 8x 8x 8x   1x 1296x 860x 860x 860x 860x   1296x 5x 5x 5x   431x 431x 1289x 429x 429x       1296x 2x 2x 2x   428x 428x   428x 1295x     1296x 1x  
import { Readable } from 'node:stream'
import type { PaginationList, PaginationListWithCount } from './apiTypes.ts'
 
type FetchPage<T> = (
  pageno: number,
) =>
  | PaginationList<T>
  | PromiseLike<PaginationList<T>>
  | PaginationListWithCount<T>
  | PromiseLike<PaginationListWithCount<T>>
 
export default class PaginationStream<T> extends Readable {
  private _fetchPage: FetchPage<T>
 
  private _nitems?: number
 
  private _pageno = 0
 
  private _items: T[] = []
 
  private _itemsRead = 0
 
  constructor(fetchPage: FetchPage<T>) {
    super({ objectMode: true })
    this._fetchPage = fetchPage
  }
 
  override async _read() {
    if (this._items.length > 0) {
      this._itemsRead++
      process.nextTick(() => this.push(this._items.pop()))
      return
    }
 
    if (this._nitems != null && this._itemsRead >= this._nitems) {
      process.nextTick(() => this.push(null))
      return
    }
 
    try {
      const { items, ...rest } = await this._fetchPage(++this._pageno)
      if ('count' in rest) {
        this._nitems = rest.count
      }
 
      // Some endpoints can return a non-zero `count` even when the current query/page yields no
      // items (e.g. filtering by date range). An empty page is a reliable signal that we are done.
      if (items.length === 0) {
        process.nextTick(() => this.push(null))
        return
      }
 
      this._items = Array.from(items)
      this._items.reverse()
 
      this._read()
    } catch (err) {
      this.emit('error', err)
    }
  }
}