All files / src ast.js

86.26% Statements 157/182
77.78% Branches 70/90
93.48% Functions 43/46
86.11% Lines 155/180
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439                                                                  260x 5x   255x               249x 249x 249x                                                                   45x 45x 45x     45x             394x 347x 347x   394x 50x   344x       48x 47x 44x     71x 71x 67x     48x 46x 42x     36x 35x 33x     35x 34x 32x     22x 22x 21x                     45x   45x 17x 17x   17x 39x   28x 28x 20x   28x 8x   28x 4x   24x       28x 28x   120x 62x   120x 50x   120x 28x   120x 120x   120x 103x         122x       31x 10x   23x 21x         21x   31x         42x   13x     29x   42x       21x   6x   21x   6x       19x 15x             15x   21x         28x           122x             348x 329x 329x 329x   141x 141x 188x   91x   238x   238x 238x     97x 429x       141x 141x 141x   139x   4x   135x     2x 139x       238x           238x         10x 10x   10x 10x     98x     11x     36x     10x     10x     50x   50x     138x 5x   133x 138x 5x   133x     133x           67x   67x 67x           44x 68x 44x 44x   44x 29x     15x   44x           33x 33x 33x 33x           32x 32x 32x 32x           42x 42x 42x 42x 42x           21x   21x             4x 4x       3x       4x 4x 1x   3x                                
import { Observable } from 'rxjs/Observable'
import 'rxjs/add/observable/empty'
 
import 'rxjs/add/operator/publishReplay'
import 'rxjs/add/operator/scan'
import 'rxjs/add/operator/filter'
import 'rxjs/add/operator/map'
import 'rxjs/add/operator/toArray'
import 'rxjs/add/operator/defaultIfEmpty'
import 'rxjs/add/operator/ignoreElements'
import 'rxjs/add/operator/merge'
import 'rxjs/add/operator/mergeMap'
import 'rxjs/add/operator/take'
 
import snakeCase from 'snake-case'
import deepEqual from 'deep-equal'
 
import checkArgs from './util/check-args'
import validIndexValue from './util/valid-index-value.js'
import { serialize } from './serialization.js'
 
import watchRewrites from './hacks/watch-rewrites'
 
 
/**
 @this TermBase
 
 Validation check to throw an exception if a method is chained onto a
 query that already has it. It belongs to TermBase, but we don't want
 to pollute the objects with it (since it isn't useful to api users),
 so it's dynamically bound with .call inside methods that use it.
*/
function checkIfLegalToChain(key) {
  if (this._legalMethods.indexOf(key) === -1) {
    throw new Error(`${key} cannot be called on the current query`)
  }
  Iif (snakeCase(key) in this._query) {
    throw new Error(`${key} has already been called on this query`)
  }
}
 
// Abstract base class for terms
export class TermBase {
  constructor(sendRequest, query, legalMethods) {
    this._sendRequest = sendRequest
    this._query = query
    this._legalMethods = legalMethods
  }
 
  toString() {
    let string = `Collection('${this._query.collection}')`
    if (this._query.find) {
      string += `.find(${JSON.stringify(this._query.find)})`
    }
    if (this._query.find_all) {
      string += `.findAll(${JSON.stringify(this._query.find_all)})`
    }
    if (this._query.order) {
      string += `.order(${JSON.stringify(this._query.order[0])}, ` +
                       `${JSON.stringify(this._query.order[1])})`
    }
    if (this._query.above) {
      string += `.above(${JSON.stringify(this.query.above[0])}, ` +
                       `${JSON.stringify(this.query.above[1])})`
    }
    if (this._query.below) {
      string += `.below(${JSON.stringify(this.query.below[0])}, ` +
                       `${JSON.stringify(this.query.below[1])})`
    }
    if (this._query.limit) {
      string += '.limit(this._query.limit))'
    }
    return string
  }
  // Returns a sequence of the result set. Every time it changes the
  // updated sequence will be emitted. If raw change objects are
  // needed, pass the option 'rawChanges: true'. An observable is
  // returned which will lazily emit the query when it is subscribed
  // to
  watch({ rawChanges = false } = {}) {
    const query = watchRewrites(this, this._query)
    const raw = this._sendRequest('subscribe', query)
    Iif (rawChanges) {
      return raw
    } else {
      return makePresentable(raw, this._query)
    }
  }
  // Grab a snapshot of the current query (non-changefeed). Emits an
  // array with all results. An observable is returned which will
  // lazily emit the query when subscribed to
  fetch() {
    const raw = this._sendRequest('query', this._query).map(val => {
      delete val.$hz_v$
      return val
    })
    if (this._query.find) {
      return raw.defaultIfEmpty(null)
    } else {
      return raw.toArray()
    }
  }
  findAll(...fieldValues) {
    checkIfLegalToChain.call(this, 'findAll')
    checkArgs('findAll', arguments, { maxArgs: 100 })
    return new FindAll(this._sendRequest, this._query, fieldValues)
  }
  find(idOrObject) {
    checkIfLegalToChain.call(this, 'find')
    checkArgs('find', arguments)
    return new Find(this._sendRequest, this._query, idOrObject)
  }
  order(fields, direction = 'ascending') {
    checkIfLegalToChain.call(this, 'order')
    checkArgs('order', arguments, { minArgs: 1, maxArgs: 2 })
    return new Order(this._sendRequest, this._query, fields, direction)
  }
  above(aboveSpec, bound = 'closed') {
    checkIfLegalToChain.call(this, 'above')
    checkArgs('above', arguments, { minArgs: 1, maxArgs: 2 })
    return new Above(this._sendRequest, this._query, aboveSpec, bound)
  }
  below(belowSpec, bound = 'open') {
    checkIfLegalToChain.call(this, 'below')
    checkArgs('below', arguments, { minArgs: 1, maxArgs: 2 })
    return new Below(this._sendRequest, this._query, belowSpec, bound)
  }
  limit(size) {
    checkIfLegalToChain.call(this, 'limit')
    checkArgs('limit', arguments)
    return new Limit(this._sendRequest, this._query, size)
  }
}
 
// Turn a raw observable of server responses into user-presentable events
//
// `observable` is the base observable with full responses coming from
//              the HorizonSocket
// `query` is the value of `options` in the request
function makePresentable(observable, query) {
  // Whether the entire data structure is in each change
  const pointQuery = Boolean(query.find)
 
  if (pointQuery) {
    let hasEmitted = false
    const seedVal = null
    // Simplest case: just pass through new_val
    return observable
      .filter(change => !hasEmitted || change.type !== 'state')
      .scan((previous, change) => {
        hasEmitted = true
        if (change.new_val != null) {
          delete change.new_val.$hz_v$
        }
        if (change.old_val != null) {
          delete change.old_val.$hz_v$
        }
        if (change.state === 'synced') {
          return previous
        } else {
          return change.new_val
        }
      }, seedVal)
  } else {
    const seedVal = { emitted: false, val: [] }
    return observable
      .scan((state, change) => {
        if (change.new_val != null) {
          delete change.new_val.$hz_v$
        }
        if (change.old_val != null) {
          delete change.old_val.$hz_v$
        }
        if (change.state === 'synced') {
          state.emitted = true
        }
        state.val = applyChange(state.val.slice(), change)
        return state
      }, seedVal)
      .filter(state => state.emitted)
      .map(x => x.val)
  }
}
 
export function applyChange(arr, change) {
  switch (change.type) {
  case 'remove':
  case 'uninitial': {
    // Remove old values from the array
    if (change.old_offset != null) {
      arr.splice(change.old_offset, 1)
    } else {
      const index = arr.findIndex(x => deepEqual(x.id, change.old_val.id))
      Iif (index === -1) {
        // Programming error. This should not happen
        throw new Error(
          `change couldn't be applied: ${JSON.stringify(change)}`)
      }
      arr.splice(index, 1)
    }
    break
  }
  case 'add':
  case 'initial': {
    // Add new values to the array
    if (change.new_offset != null) {
      // If we have an offset, put it in the correct location
      arr.splice(change.new_offset, 0, change.new_val)
    } else {
      // otherwise for unordered results, push it on the end
      arr.push(change.new_val)
    }
    break
  }
  case 'change': {
    // Modify in place if a change is happening
    if (change.old_offset != null) {
      // Remove the old document from the results
      arr.splice(change.old_offset, 1)
    }
    if (change.new_offset != null) {
      // Splice in the new val if we have an offset
      arr.splice(change.new_offset, 0, change.new_val)
    } else {
      // If we don't have an offset, find the old val and
      // replace it with the new val
      const index = arr.findIndex(x => deepEqual(x.id, change.old_val.id))
      Iif (index === -1) {
        // indicates a programming bug. The server gives us the
        // ordering, so if we don't find the id it means something is
        // buggy.
        throw new Error(
          `change couldn't be applied: ${JSON.stringify(change)}`)
      }
      arr[index] = change.new_val
    }
    break
  }
  case 'state': {
    // This gets hit if we have not emitted yet, and should
    // result in an empty array being output.
    break
  }
  default:
    throw new Error(
      `unrecognized 'type' field from server ${JSON.stringify(change)}`)
  }
  return arr
}
 
/** @this Collection
 Implements writeOps for the Collection class
*/
function writeOp(name, args, documents) {
  checkArgs(name, args)
  let isBatch = true
  let wrappedDocs = documents
  if (!Array.isArray(documents)) {
    // Wrap in an array if we need to
    wrappedDocs = [ documents ]
    isBatch = false
  } else if (documents.length === 0) {
    // Don't bother sending no-ops to the server
    return Observable.empty()
  }
  const options = Object.assign(
    {}, this._query, { data: serialize(wrappedDocs) })
  let observable = this._sendRequest(name, options)
  if (isBatch) {
    // If this is a batch writeOp, each document may succeed or fail
    // individually.
    observable = observable.map(
      resp => resp.error ? new Error(resp.error) : resp)
  } else {
    // If this is a single writeOp, the entire operation should fail
    // if any fails.
    const _prevOb = observable
    observable = Observable.create(subscriber => {
      _prevOb.subscribe({
        next(resp) {
          if (resp.error) {
            // TODO: handle error ids when we get them
            subscriber.error(new Error(resp.error))
          } else {
            subscriber.next(resp)
          }
        },
        error(err) { subscriber.error(err) },
        complete() { subscriber.complete() },
      })
    })
  }
  Iif (!this._lazyWrites) {
    // Need to buffer response since this becomes a hot observable and
    // when we subscribe matters
    observable = observable.publishReplay().refCount()
    observable.subscribe()
  }
  return observable
}
 
export class Collection extends TermBase {
  constructor(sendRequest, collectionName, lazyWrites) {
    const query = { collection: collectionName }
    const legalMethods = [
      'find', 'findAll', 'order', 'above', 'below', 'limit' ]
    super(sendRequest, query, legalMethods)
    this._lazyWrites = lazyWrites
  }
  store(documents) {
    return writeOp.call(this, 'store', arguments, documents)
  }
  upsert(documents) {
    return writeOp.call(this, 'upsert', arguments, documents)
  }
  insert(documents) {
    return writeOp.call(this, 'insert', arguments, documents)
  }
  replace(documents) {
    return writeOp.call(this, 'replace', arguments, documents)
  }
  update(documents) {
    return writeOp.call(this, 'update', arguments, documents)
  }
  remove(documentOrId) {
    const wrapped = validIndexValue(documentOrId) ?
          { id: documentOrId } : documentOrId
    return writeOp.call(this, 'remove', arguments, wrapped)
  }
  removeAll(documentsOrIds) {
    if (!Array.isArray(documentsOrIds)) {
      throw new Error('removeAll takes an array as an argument')
    }
    const wrapped = documentsOrIds.map(item => {
      if (validIndexValue(item)) {
        return { id: item }
      } else {
        return item
      }
    })
    return writeOp.call(this, 'removeAll', arguments, wrapped)
  }
}
 
export class Find extends TermBase {
  constructor(sendRequest, previousQuery, idOrObject) {
    const findObject = validIndexValue(idOrObject) ?
          { id: idOrObject } : idOrObject
    const query = Object.assign({}, previousQuery, { find: findObject })
    super(sendRequest, query, [])
  }
}
 
export class FindAll extends TermBase {
  constructor(sendRequest, previousQuery, fieldValues) {
    const wrappedFields = fieldValues
          .map(item => validIndexValue(item) ? { id: item } : item)
    const options = { find_all: wrappedFields }
    const findAllQuery = Object.assign({}, previousQuery, options)
    let legalMethods
    if (wrappedFields.length === 1) {
      legalMethods = [ 'order', 'above', 'below', 'limit' ]
    } else {
      // The vararg version of findAll cannot have anything chained to it
      legalMethods = []
    }
    super(sendRequest, findAllQuery, legalMethods)
  }
}
 
export class Above extends TermBase {
  constructor(sendRequest, previousQuery, aboveSpec, bound) {
    const option = { above: [ aboveSpec, bound ] }
    const query = Object.assign({}, previousQuery, option)
    const legalMethods = [ 'findAll', 'order', 'below', 'limit' ]
    super(sendRequest, query, legalMethods)
  }
}
 
export class Below extends TermBase {
  constructor(sendRequest, previousQuery, belowSpec, bound) {
    const options = { below: [ belowSpec, bound ] }
    const query = Object.assign({}, previousQuery, options)
    const legalMethods = [ 'findAll', 'order', 'above', 'limit' ]
    super(sendRequest, query, legalMethods)
  }
}
 
export class Order extends TermBase {
  constructor(sendRequest, previousQuery, fields, direction) {
    const wrappedFields = Array.isArray(fields) ? fields : [ fields ]
    const options = { order: [ wrappedFields, direction ] }
    const query = Object.assign({}, previousQuery, options)
    const legalMethods = [ 'findAll', 'above', 'below', 'limit' ]
    super(sendRequest, query, legalMethods)
  }
}
 
export class Limit extends TermBase {
  constructor(sendRequest, previousQuery, size) {
    const query = Object.assign({}, previousQuery, { limit: size })
    // Nothing is legal to chain after .limit
    super(sendRequest, query, [])
  }
}
 
 
export class UserDataTerm {
  constructor(hz, handshake, socket) {
    this._hz = hz
    this._before = socket.ignoreElements().merge(handshake)
  }
 
  _query(userId) {
    return this._hz('users').find(userId)
  }
 
  fetch() {
    return this._before.mergeMap(handshake => {
        if (handshake.id == null) {
          throw new Error('Unauthenticated users have no user document')
        } else {
          return this._query(handshake.id).fetch()
        }
      }).take(1) // necessary so that we complete, since _before is
                 // infinite
  }
 
  watch(...args) {
    return this._before.mergeMap(handshake => {
      if (handshake.id === null) {
        throw new Error('Unauthenticated users have no user document')
      } else {
        return this._query(handshake.id).watch(...args)
      }
    })
  }
}