All files / src socket.js

89.47% Statements 51/57
73.33% Branches 11/15
86.36% Functions 19/22
89.29% Lines 50/56
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203                                1x     1x   1x   1x   1x       29x 29x                             1062x             1193x 1193x                 7x         6x       4x 3x 3x   4x           7x 7x 7x   7x           7x     7x       7x 7x     1x         683x 595x 595x         683x 598x 598x         2727x       683x             683x 6x     6x       6x 6x 6x                     6x   683x               683x   683x                               677x     1050x 29x   1021x   1021x   558x           1021x          
import { AsyncSubject } from 'rxjs/AsyncSubject'
import { BehaviorSubject } from 'rxjs/BehaviorSubject'
import { WebSocketSubject } from 'rxjs/observable/dom/WebSocketSubject'
import { Observable } from 'rxjs/Observable'
import { Subscription } from 'rxjs/Subscription'
import 'rxjs/add/observable/merge'
import 'rxjs/add/observable/timer'
import 'rxjs/add/operator/filter'
import 'rxjs/add/operator/share'
import 'rxjs/add/operator/ignoreElements'
import 'rxjs/add/operator/concat'
import 'rxjs/add/operator/takeWhile'
import 'rxjs/add/operator/publish'
 
import { serialize, deserialize } from './serialization.js'
 
const PROTOCOL_VERSION = 'rethinkdb-horizon-v0'
 
// Before connecting the first time
const STATUS_UNCONNECTED = { type: 'unconnected' }
// After the websocket is opened and handshake is completed
const STATUS_READY = { type: 'ready' }
// After unconnected, maybe before or after connected. Any socket level error
const STATUS_ERROR = { type: 'error' }
// Occurs when the socket closes
const STATUS_DISCONNECTED = { type: 'disconnected' }
 
class ProtocolError extends Error {
  constructor(msg, errorCode) {
    super(msg)
    this.errorCode = errorCode
  }
  toString() {
    return `${this.message} (Code: ${this.errorCode})`
  }
}
 
 
// Wraps native websockets with a Subject, which is both an Subscriber
// and an Observable (it is bi-directional after all!). This version
// is based on the rxjs.observable.dom.WebSocketSubject implementation.
export class HorizonSocket extends WebSocketSubject {
  // Deserializes a message from a string. Overrides the version
  // implemented in WebSocketSubject
  resultSelector(e) {
    return deserialize(JSON.parse(e.data))
  }
 
  // We're overriding the next defined in AnonymousSubject so we
  // always serialize the value. When this is called a message will be
  // sent over the socket to the server.
  next(value) {
    const request = JSON.stringify(serialize(value))
    super.next(request)
  }
 
  constructor({
    url,              // Full url to connect to
    handshakeMaker, // function that returns handshake to emit
    keepalive = 60,   // seconds between keepalive messages
    WebSocketCtor = WebSocket,    // optionally provide a WebSocket constructor
  } = {}) {
    super({
      url,
      protocol: PROTOCOL_VERSION,
      WebSocketCtor,
      openObserver: {
        next: () => this.sendHandshake(),
      },
      closeObserver: {
        next: () => {
          if (this._handshakeSub) {
            this._handshakeSub.unsubscribe()
            this._handshakeSub = null
          }
          this.status.next(STATUS_DISCONNECTED)
        },
      },
    })
    // Completes or errors based on handshake success. Buffers
    // handshake response for later subscribers (like a Promise)
    this.handshake = new AsyncSubject()
    this._handshakeMaker = handshakeMaker
    this._handshakeSub = null
 
    this.keepalive = Observable
      .timer(keepalive * 1000, keepalive * 1000)
      .map(n => this.makeRequest({ type: 'keepalive' }).subscribe())
      .publish()
 
    // This is used to emit status changes that others can hook into.
    this.status = new BehaviorSubject(STATUS_UNCONNECTED)
    // Keep track of subscribers so we's can decide when to
    // unsubscribe.
    this.requestCounter = 0
    // A map from request_ids to an object with metadata about the
    // request. Eventually, this should allow re-sending requests when
    // reconnecting.
    this.activeRequests = new Map()
    this._output.subscribe({
      // This emits if the entire socket errors (usually due to
      // failure to connect)
      error: () => this.status.next(STATUS_ERROR),
    })
  }
 
  deactivateRequest(req) {
    return () => {
      this.activeRequests.delete(req.request_id)
      return { request_id: req.request_id, type: 'end_subscription' }
    }
  }
 
  activateRequest(req) {
    return () => {
      this.activeRequests.set(req.request_id, req)
      return req
    }
  }
 
  filterRequest(req) {
    return resp => resp.request_id === req.request_id
  }
 
  getRequest(request) {
    return Object.assign({ request_id: this.requestCounter++ }, request)
  }
 
  // This is a trimmed-down version of multiplex that only listens for
  // the handshake requestId. It also starts the keepalive observable
  // and cleans up after it when the handshake is cleaned up.
  sendHandshake() {
    if (!this._handshakeSub) {
      this._handshakeSub = this.makeRequest(this._handshakeMaker())
        .subscribe({
          next: n => {
            Iif (n.error) {
              this.status.next(STATUS_ERROR)
              this.handshake.error(new ProtocolError(n.error, n.error_code))
            } else {
              this.status.next(STATUS_READY)
              this.handshake.next(n)
              this.handshake.complete()
            }
          },
          error: e => {
            this.status.next(STATUS_ERROR)
            this.handshake.error(e)
          },
        })
 
      // Start the keepalive and make sure it's
      // killed when the handshake is cleaned up
      this._handshakeSub.add(this.keepalive.connect())
    }
    return this.handshake
  }
 
  // Incorporates shared logic between the inital handshake request and
  // all subsequent requests.
  // * Generates a request id and filters by it
  // * Send `end_subscription` when observable is unsubscribed
  makeRequest(rawRequest) {
    const request = this.getRequest(rawRequest)
 
    return super.multiplex(
      this.activateRequest(request),
      this.deactivateRequest(request),
      this.filterRequest(request)
    )
  }
 
  // Wrapper around the makeRequest with the following additional
  // features we need for horizon's protocol:
  // * Sends handshake on subscription if it hasn't happened already
  // * Wait for the handshake to complete before sending the request
  // * Errors when a document with an `error` field is received
  // * Completes when `state: complete` is received
  // * Emits `state: synced` as a separate document for easy filtering
  // * Reference counts subscriptions
  hzRequest(rawRequest) {
    return this.sendHandshake().ignoreElements()
      .concat(this.makeRequest(rawRequest))
      .concatMap(resp => {
        if (resp.error !== undefined) {
          throw new ProtocolError(resp.error, resp.error_code)
        }
        const data = resp.data || []
 
        if (resp.state !== undefined) {
          // Create a little dummy object for sync notifications
          data.push({
            type: 'state',
            state: resp.state,
          })
        }
 
        return data
      })
      .share()
  }
}