All files PaginationStream.ts

94.73% Statements 36/38
95.45% Branches 21/22
100% Functions 3/3
94.73% Lines 36/38

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 621x                     1x 1x   8x   8x   8x   8x   1x 8x 8x 8x   1x 915x 606x 606x 606x 606x   915x 5x 5x 5x   304x 304x 908x 302x 302x       915x 2x 2x 2x   301x 301x   301x 914x     915x 1x  
import { Readable } from 'node:stream'
import type { PaginationList, PaginationListWithCount } from './apiTypes.ts'
 
type FetchPage<T> = (
  pageno: number,
) =>
  | PaginationList<T>
  | PromiseLike<PaginationList<T>>
  | PaginationListWithCount<T>
  | PromiseLike<PaginationListWithCount<T>>
 
export default class PaginationStream<T> extends Readable {
  private _fetchPage: FetchPage<T>
 
  private _nitems?: number
 
  private _pageno = 0
 
  private _items: T[] = []
 
  private _itemsRead = 0
 
  constructor(fetchPage: FetchPage<T>) {
    super({ objectMode: true })
    this._fetchPage = fetchPage
  }
 
  override async _read() {
    if (this._items.length > 0) {
      this._itemsRead++
      process.nextTick(() => this.push(this._items.pop()))
      return
    }
 
    if (this._nitems != null && this._itemsRead >= this._nitems) {
      process.nextTick(() => this.push(null))
      return
    }
 
    try {
      const { items, ...rest } = await this._fetchPage(++this._pageno)
      if ('count' in rest) {
        this._nitems = rest.count
      }
 
      // Some endpoints can return a non-zero `count` even when the current query/page yields no
      // items (e.g. filtering by date range). An empty page is a reliable signal that we are done.
      if (items.length === 0) {
        process.nextTick(() => this.push(null))
        return
      }
 
      this._items = Array.from(items)
      this._items.reverse()
 
      this._read()
    } catch (err) {
      this.emit('error', err)
    }
  }
}