Signaling.js

import EventEmitter from 'events'
import TransactionManager from 'transaction-manager'
import Logger from './Logger'
import SdpParser from './utils/SdpParser'
import { VideoCodec } from './utils/Codecs'
import PeerConnection from './PeerConnection'
import Diagnostics from './utils/Diagnostics'

const logger = Logger.get('Signaling')

export const signalingEvents = {
  connectionSuccess: 'wsConnectionSuccess',
  connectionError: 'wsConnectionError',
  connectionClose: 'wsConnectionClose',
  broadcastEvent: 'broadcastEvent'
}

/**
 * @typedef {Object} LayerInfo
 * @property {String} encodingId         - rid value of the simulcast encoding of the track  (default: automatic selection)
 * @property {Number} spatialLayerId     - The spatial layer id to send to the outgoing stream (default: max layer available)
 * @property {Number} temporalLayerId    - The temporaral layer id to send to the outgoing stream (default: max layer available)
 * @property {Number} maxSpatialLayerId  - Max spatial layer id (default: unlimited)
 * @property {Number} maxTemporalLayerId - Max temporal layer id (default: unlimited)
 */

/**
 * @typedef {Object} SignalingSubscribeOptions
 * @property {String} vad - Enable VAD multiplexing for secondary sources.
 * @property {String} pinnedSourceId - Id of the main source that will be received by the default MediaStream.
 * @property {Array<String>} excludedSourceIds - Do not receive media from the these source ids.
 * @property {Array<String>} events - Override which events will be delivered by the server ("active" | "inactive" | "vad" | "layers" | "updated").
 * @property {LayerInfo} layer - Select the simulcast encoding layer and svc layers for the main video track, leave empty for automatic layer selection based on bandwidth estimation.
 */

/**
 * @typedef {Object} SignalingPublishOptions
 * @property {VideoCodec} [codec="h264"] - Codec for publish stream.
 * @property {Boolean} [record] - Enable stream recording. If record is not provided, use default Token configuration. **Only available in Tokens with recording enabled.**
 * @property {String} [sourceId] - Source unique id. **Only available in Tokens with multisource enabled.***
 * @property {Array<String>} events - Override which events will be delivered by the server ("active" | "inactive").
 */

/**
 * @class Signaling
 * @extends EventEmitter
 * @classdesc Starts WebSocket connection and manages the messages between peers.
 * @example const millicastSignaling = new Signaling(options)
 * @constructor
 * @param {Object} options - General signaling options.
 * @param {String} options.streamName - Millicast stream name to get subscribed.
 * @param {String} options.url - WebSocket URL to signal Millicast server and establish a WebRTC connection.
 */

export default class Signaling extends EventEmitter {
  constructor (options = {
    streamName: null,
    url: 'ws://localhost:8080/'
  }
  ) {
    super()
    this.streamName = options.streamName
    this.wsUrl = options.url
    this.webSocket = null
    this.transactionManager = null
    this.serverId = null
    this.clusterId = null
    this.streamViewId = null
  }

  /**
   * Starts a WebSocket connection with signaling server.
   * @example const response = await millicastSignaling.connect()
   * @returns {Promise<WebSocket>} Promise object which represents the [WebSocket object]{@link https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API} of the establshed connection.
   * @fires Signaling#wsConnectionSuccess
   * @fires Signaling#wsConnectionError
   * @fires Signaling#wsConnectionClose
   * @fires Signaling#broadcastEvent
   */
  async connect () {
    logger.info('Connecting to Signaling Server')
    if (this.transactionManager && this.webSocket?.readyState === WebSocket.OPEN) {
      logger.info('Connected to server: ', this.webSocket.url)
      logger.debug('WebSocket value: ', {
        url: this.webSocket.url,
        protocol: this.webSocket.protocol,
        readyState: this.webSocket.readyState,
        binaryType: this.webSocket.binaryType,
        extensions: this.webSocket.extensions
      })
      /**
       * WebSocket connection was successfully established with signaling server.
       *
       * @event Signaling#wsConnectionSuccess
       * @type {Object}
       * @property {WebSocket} ws - WebSocket object which represents active connection.
       * @property {TransactionManager} tm - [TransactionManager](https://github.com/medooze/transaction-manager) object that simplify WebSocket commands.
       */
      this.emit(signalingEvents.connectionSuccess, { ws: this.webSocket, tm: this.transactionManager })
      return this.webSocket
    }

    return new Promise((resolve, reject) => {
      this.webSocket = new WebSocket(this.wsUrl)
      this.transactionManager = new TransactionManager(this.webSocket)
      this.webSocket.onopen = () => {
        logger.info('WebSocket opened')
        this.transactionManager.on('event', (evt) => {
          /**
           * Passthrough of available Millicast broadcast events.
           *
           * Active - Fires when the live stream is, or has started broadcasting.
           *
           * Inactive - Fires when the stream has stopped broadcasting, but is still available.
           *
           * Stopped - Fires when the stream has stopped for a given reason.
           *
           * Vad - Fires when using multiplexed tracks for audio.
           *
           * Layers - Fires when there is an update of the state of the layers in a stream (when broadcasting with simulcast).
           *
           * Migrate - Fires when the server is having problems, is shutting down or when viewers need to move for load balancing purposes.
           *
           * Viewercount - Fires when the viewer count changes.
           *
           * Updated - when an active stream's tracks are updated
           *
           * More information here: {@link https://docs.dolby.io/streaming-apis/docs/web#broadcast-events}
           *
           * @event Signaling#broadcastEvent
           * @type {Object}
           * @property {String} type - In this case the type of this message is "event".
           * @property {("active" | "inactive" | "stopped" | "vad" | "layers" | "migrate" | "viewercount" | "updated")} name - Event name.
           * @property {Object} data - Custom event data.
           */
          this.emit(signalingEvents.broadcastEvent, evt)
        })
        logger.info('Connected to server: ', this.webSocket.url)
        logger.debug('WebSocket value: ', {
          url: this.webSocket.url,
          protocol: this.webSocket.protocol,
          readyState: this.webSocket.readyState,
          binaryType: this.webSocket.binaryType,
          extensions: this.webSocket.extensions
        })
        this.emit(signalingEvents.connectionSuccess, { ws: this.webSocket, tm: this.transactionManager })
        resolve(this.webSocket)
      }
      this.webSocket.onerror = () => {
        logger.error('WebSocket not connected: ', this.webSocket.url)
        /**
           * WebSocket connection failed with signaling server.
           * Returns url of WebSocket
           *
           * @event Signaling#wsConnectionError
           * @type {String}
           */
        this.emit(signalingEvents.connectionError, this.webSocket.url)
        reject(this.webSocket.url)
      }
      this.webSocket.onclose = () => {
        this.webSocket = null
        this.transactionManager = null
        logger.info('Connection closed with Signaling Server.')
        /**
         * WebSocket connection with signaling server was successfully closed.
         *
         * @event Signaling#wsConnectionClose
         */
        this.emit(signalingEvents.connectionClose)
      }
    })
  }

  /**
   * Close WebSocket connection with Millicast server.
   * @example millicastSignaling.close()
   */
  close () {
    logger.info('Closing connection with Signaling Server.')
    this.webSocket?.close()
  }

  /**
   * Establish WebRTC connection with Millicast Server as Subscriber role.
   * @param {String} sdp - The SDP information created by your offer.
   * @param {SignalingSubscribeOptions} options - Signaling Subscribe Options.
   * @example const response = await millicastSignaling.subscribe(sdp)
   * @return {Promise<String>} Promise object which represents the SDP command response.
   */
  async subscribe (sdp, options, pinnedSourceId = null, excludedSourceIds = null) {
    logger.info('Starting subscription to streamName: ', this.streamName)
    logger.debug('Subcription local description: ', sdp)
    const optionsParsed = getSubscribeOptions(options, pinnedSourceId, excludedSourceIds)

    // Signaling server only recognizes 'AV1' and not 'AV1X'
    sdp = SdpParser.adaptCodecName(sdp, 'AV1X', VideoCodec.AV1)

    // default events
    const events = ['active', 'inactive', 'layers', 'viewercount', 'vad', 'updated', 'migrate', 'stopped']
    const data = { sdp, streamId: this.streamName, pinnedSourceId: optionsParsed.pinnedSourceId, excludedSourceIds: optionsParsed.excludedSourceIds, events }

    if (optionsParsed.vad) { data.vad = true }
    if (Array.isArray(optionsParsed.events)) { data.events = optionsParsed.events }
    if (optionsParsed.forcePlayoutDelay) { data.forcePlayoutDelay = optionsParsed.forcePlayoutDelay }
    if (optionsParsed.layer) { data.layer = optionsParsed.layer }

    try {
      if (optionsParsed.disableVideo && optionsParsed.disableAudio) {
        throw new Error('Not attempting to connect as video and audio are disabled')
      }
      await this.connect()
      logger.info('Sending view command')
      const result = await this.transactionManager.cmd('view', data)

      // Check if browser supports AV1X
      const AV1X = RTCRtpReceiver.getCapabilities?.('video')?.codecs?.find?.(codec => codec.mimeType === 'video/AV1X')
      // Signaling server returns 'AV1'. If browser supports AV1X, we change it to AV1X
      result.sdp = AV1X ? SdpParser.adaptCodecName(result.sdp, VideoCodec.AV1, 'AV1X') : result.sdp

      logger.info('Command sent, subscriberId: ', result.subscriberId)
      logger.debug('Command result: ', result)
      this.serverId = result.subscriberId
      this.clusterId = result.clusterId
      this.streamViewId = result.streamViewId

      // Save for diagnostics
      Diagnostics.initStreamName(this.streamName)
      Diagnostics.initSubscriberId(this.serverId)
      Diagnostics.initStreamViewId(result.streamViewId)
      Diagnostics.setClusterId(this.clusterId)
      return result.sdp
    } catch (e) {
      logger.error('Error sending view command, error: ', e)
      throw e
    }
  }

  /**
   * Establish WebRTC connection with Millicast Server as Publisher role.
   * @param {String} sdp - The SDP information created by your offer.
   * @param {SignalingPublishOptions} options - Signaling Publish Options.
   * @example const response = await millicastSignaling.publish(sdp, {codec: 'h264'})
   * @return {Promise<String>} Promise object which represents the SDP command response.
   */
  async publish (sdp, options, record = null, sourceId = null) {
    const optionsParsed = getPublishOptions(options, record, sourceId)

    logger.info(`Starting publishing to streamName: ${this.streamName}, codec: ${optionsParsed.codec}`)
    logger.debug('Publishing local description: ', sdp)
    const supportedVideoCodecs = PeerConnection.getCapabilities?.('video')?.codecs?.map(cdc => cdc.codec) ?? []

    const videoCodecs = Object.values(VideoCodec)
    if (videoCodecs.indexOf(optionsParsed.codec) === -1) {
      logger.error(`Invalid codec ${optionsParsed.codec}. Possible values are: `, videoCodecs)
      throw new Error(`Invalid codec ${optionsParsed.codec}. Possible values are: ${videoCodecs}`)
    }

    if (supportedVideoCodecs.length > 0 && supportedVideoCodecs.indexOf(optionsParsed.codec) === -1) {
      logger.error(`Unsupported codec ${optionsParsed.codec}. Possible values are: `, supportedVideoCodecs)
      throw new Error(`Unsupported codec ${optionsParsed.codec}. Possible values are: ${supportedVideoCodecs}`)
    }

    // Signaling server only recognizes 'AV1' and not 'AV1X'
    if (optionsParsed.codec === VideoCodec.AV1) {
      sdp = SdpParser.adaptCodecName(sdp, 'AV1X', VideoCodec.AV1)
    }

    const data = {
      name: this.streamName,
      sdp,
      codec: optionsParsed.codec,
      sourceId: optionsParsed.sourceId
    }

    if (optionsParsed.priority) {
      if (
        Number.isInteger(optionsParsed.priority) &&
        optionsParsed.priority >= -2147483648 &&
        optionsParsed.priority <= 2147483647
      ) {
        data.priority = optionsParsed.priority
      } else {
        throw new Error('Invalid value for priority option. It should be a decimal integer between the range [-2^31, +2^31 - 1]')
      }
    }

    if (optionsParsed.record !== null) {
      data.record = optionsParsed.record
    }
    if (Array.isArray(optionsParsed.events)) {
      data.events = optionsParsed.events
    }
    try {
      if (optionsParsed.disableVideo && optionsParsed.disableAudio) {
        throw new Error('Not attempting to connect as video and audio are disabled')
      }
      await this.connect()
      logger.info('Sending publish command')
      const result = await this.transactionManager.cmd('publish', data)

      if (optionsParsed.codec === VideoCodec.AV1) {
        // If browser supports AV1X, we change from AV1 to AV1X
        const AV1X = RTCRtpSender.getCapabilities?.('video')?.codecs?.find?.(codec => codec.mimeType === 'video/AV1X')
        result.sdp = AV1X ? SdpParser.adaptCodecName(result.sdp, VideoCodec.AV1, 'AV1X') : result.sdp
      }

      logger.info('Command sent, publisherId: ', result.publisherId)
      logger.debug('Command result: ', result)
      this.serverId = result.publisherId
      this.clusterId = result.clusterId

      // Save for diagnostics
      Diagnostics.initStreamName(this.streamName)
      Diagnostics.initSubscriberId(this.serverId)
      Diagnostics.initFeedId(result.feedId)
      Diagnostics.setClusterId(this.clusterId)
      return result.sdp
    } catch (e) {
      logger.error('Error sending publish command, error: ', e)
      throw e
    }
  }

  /**
   * Send command to the server.
   * @param {String} cmd - Command name.
   * @param {Object} [data] - Command parameters.
   * @return {Promise<Object>} Promise object which represents the command response.
   */
  async cmd (cmd, data) {
    logger.info(`Sending cmd: ${cmd}`)

    return this.transactionManager.cmd(cmd, data)
  }
}

const getSubscribeOptions = (options, legacyPinnedSourceId, legacyExcludedSourceIds) => {
  let parsedOptions = (typeof options === 'object') ? options : {}
  if (Object.keys(parsedOptions).length === 0) {
    parsedOptions = {
      vad: options,
      pinnedSourceId: legacyPinnedSourceId,
      excludedSourceIds: legacyExcludedSourceIds
    }
  }
  return parsedOptions
}

const getPublishOptions = (options, legacyRecord, legacySourceId) => {
  let parsedOptions = (typeof options === 'object') ? options : {}
  if (Object.keys(parsedOptions).length === 0) {
    const defaultCodec = VideoCodec.H264
    parsedOptions = {
      codec: options ?? defaultCodec,
      record: legacyRecord,
      sourceId: legacySourceId
    }
  }
  return parsedOptions
}