All files / src index.js

91.49% Statements 43/47
72.41% Branches 21/29
92.86% Functions 13/14
91.49% Lines 43/47
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142                        1x   1x                             7x 7x   7x 7x                 7x   6x 2x                             10x     7x 4x   7x 3x             7x 1x   4x               7x     7x 2x     7x 6x     7x     7x         7x   7x 7x 7x 7x 7x 7x 7x   7x           677x 677x   1678x         28x 5x 4x   1x         1x 1x   1x  
import 'rxjs/add/observable/of'
import 'rxjs/add/observable/from'
import 'rxjs/add/operator/catch'
import 'rxjs/add/operator/concatMap'
import 'rxjs/add/operator/map'
import 'rxjs/add/operator/filter'
 
import { Collection, UserDataTerm } from './ast'
import { HorizonSocket } from './socket'
import { authEndpoint, TokenStorage, clearAuthTokens } from './auth'
import { aggregate, model } from './model'
 
const defaultHost = typeof window !== 'undefined' && window.location &&
        `${window.location.host}` || 'localhost:8181'
const defaultSecure = typeof window !== 'undefined' && window.location &&
        window.location.protocol === 'https:' || false
 
function Horizon({
  host = defaultHost,
  secure = defaultSecure,
  path = 'horizon',
  lazyWrites = false,
  authType = 'unauthenticated',
  keepalive = 60,
  WebSocketCtor = WebSocket,
} = {}) {
  // If we're in a redirection from OAuth, store the auth token for
  // this user in localStorage.
 
  const tokenStorage = new TokenStorage({ authType, path })
  tokenStorage.setAuthFromQueryParams()
 
  const url = `ws${secure ? 's' : ''}:\/\/${host}\/${path}`
  const socket = new HorizonSocket({
    url,
    handshakeMaker: tokenStorage.handshake.bind(tokenStorage),
    keepalive,
    WebSocketCtor,
  })
 
  // Store whatever token we get back from the server when we get a
  // handshake response
  socket.handshake.subscribe({
    next(handshake) {
      if (authType !== 'unauthenticated') {
        tokenStorage.set(handshake.token)
      }
    },
    error(err) {
      if (/JsonWebTokenError|TokenExpiredError/.test(err.message)) {
        console.error('Horizon: clearing token storage since auth failed')
        tokenStorage.remove()
      }
    },
  })
 
  // This is the object returned by the Horizon function. It's a
  // function so we can construct a collection simply by calling it
  // like horizon('my_collection')
  function horizon(name) {
    return new Collection(sendRequest, name, lazyWrites)
  }
 
  horizon.currentUser = () =>
    new UserDataTerm(horizon, socket.handshake, socket)
 
  horizon.disconnect = () => {
    socket.complete()
  }
 
  // Dummy subscription to force it to connect to the
  // server. Optionally provide an error handling function if the
  // socket experiences an error.
  // Note: Users of the Observable interface shouldn't need this
  horizon.connect = (
    onError = err => { console.error(`Received an error: ${err}`) }
  ) => {
    socket.subscribe(
      () => {},
      onError
    )
  }
 
  // Either subscribe to status updates, or return an observable with
  // the current status and all subsequent status changes.
  horizon.status = subscribeOrObservable(socket.status)
 
  // Convenience method for finding out when disconnected
  horizon.onDisconnected = subscribeOrObservable(
    socket.status.filter(x => x.type === 'disconnected'))
 
  // Convenience method for finding out when ready
  horizon.onReady = subscribeOrObservable(
    socket.status.filter(x => x.type === 'ready'))
 
  // Convenience method for finding out when an error occurs
  horizon.onSocketError = subscribeOrObservable(
    socket.status.filter(x => x.type === 'error'))
 
  horizon.utensils = {
    sendRequest,
    tokenStorage,
    handshake: socket.handshake,
  }
  Object.freeze(horizon.utensils)
 
  horizon._authMethods = null
  horizon._root = `http${(secure) ? 's' : ''}://${host}`
  horizon._horizonPath = `${horizon._root}/${path}`
  horizon.authEndpoint = authEndpoint
  horizon.hasAuthToken = tokenStorage.hasAuthToken.bind(tokenStorage)
  horizon.aggregate = aggregate
  horizon.model = model
 
  return horizon
 
  // Sends a horizon protocol request to the server, and pulls the data
  // portion of the response out.
  function sendRequest(type, options) {
    // Both remove and removeAll use the type 'remove' in the protocol
    const normalizedType = type === 'removeAll' ? 'remove' : type
    return socket
      .hzRequest({ type: normalizedType, options }) // send the raw request
      .takeWhile(resp => resp.state !== 'complete')
  }
}
 
function subscribeOrObservable(observable) {
  return (...args) => {
    if (args.length > 0) {
      return observable.subscribe(...args)
    } else {
      return observable
    }
  }
}
 
Horizon.Socket = HorizonSocket
Horizon.clearAuthTokens = clearAuthTokens
 
module.exports = Horizon