Signaling.js

import EventEmitter from 'events'
import TransactionManager from 'transaction-manager'
import Logger from './Logger'
import SdpParser from './utils/SdpParser'
import { VideoCodec } from './utils/Codecs'
import PeerConnection from './PeerConnection'
import Diagnostics from './utils/Diagnostics'

const logger = Logger.get('Signaling')

export const signalingEvents = {
  connectionSuccess: 'wsConnectionSuccess',
  connectionError: 'wsConnectionError',
  connectionClose: 'wsConnectionClose',
  broadcastEvent: 'broadcastEvent'
}

/**
 * @typedef {Object} LayerInfo
 * @property {String} encodingId         - rid value of the simulcast encoding of the track  (default: automatic selection)
 * @property {Number} spatialLayerId     - The spatial layer id to send to the outgoing stream (default: max layer available)
 * @property {Number} temporalLayerId    - The temporaral layer id to send to the outgoing stream (default: max layer available)
 * @property {Number} maxSpatialLayerId  - Max spatial layer id (default: unlimited)
 * @property {Number} maxTemporalLayerId - Max temporal layer id (default: unlimited)
 */

/**
 * @typedef {Object} SignalingSubscribeOptions
 * @property {String} vad - Enable VAD multiplexing for secondary sources.
 * @property {String} pinnedSourceId - Id of the main source that will be received by the default MediaStream.
 * @property {Array<String>} excludedSourceIds - Do not receive media from the these source ids.
 * @property {Array<String>} events - Override which events will be delivered by the server ("active" | "inactive" | "vad" | "layers" | "updated").
 * @property {LayerInfo} layer - Select the simulcast encoding layer and svc layers for the main video track, leave empty for automatic layer selection based on bandwidth estimation.
 */

/**
 * @typedef {Object} SignalingPublishOptions
 * @property {VideoCodec} [codec="h264"] - Codec for publish stream.
 * @property {Boolean} [record] - Enable stream recording. If record is not provided, use default Token configuration. **Only available in Tokens with recording enabled.**
 * @property {String} [sourceId] - Source unique id. **Only available in Tokens with multisource enabled.***
 * @property {Array<String>} events - Override which events will be delivered by the server ("active" | "inactive").
 */

/**
 * @class Signaling
 * @extends EventEmitter
 * @classdesc Starts WebSocket connection and manages the messages between peers.
 * @example const millicastSignaling = new Signaling(options)
 * @constructor
 * @param {Object} options - General signaling options.
 * @param {String} options.streamName - Millicast stream name to get subscribed.
 * @param {String} options.url - WebSocket URL to signal Millicast server and establish a WebRTC connection.
 */

export default class Signaling extends EventEmitter {
  constructor (
    options = {
      streamName: null,
      url: 'ws://localhost:8080/'
    }
  ) {
    super()
    this.streamName = options.streamName
    this.wsUrl = options.url
    this.webSocket = null
    this.transactionManager = null
    this.serverId = null
    this.clusterId = null
    this.streamViewId = null
  }

  /**
     * Starts a WebSocket connection with signaling server.
     * @example const response = await millicastSignaling.connect()
     * @returns {Promise<WebSocket>} Promise object which represents the [WebSocket object]{@link https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API} of the establshed connection.
     * @fires Signaling#wsConnectionSuccess
     * @fires Signaling#wsConnectionError
     * @fires Signaling#wsConnectionClose
     * @fires Signaling#broadcastEvent
     */
  async connect () {
    logger.info('Connecting to Signaling Server')
    if (this.transactionManager && this.webSocket?.readyState === WebSocket.OPEN) {
      logger.info('Connected to server: ', this.webSocket.url)
      logger.debug('WebSocket value: ', {
        url: this.webSocket.url,
        protocol: this.webSocket.protocol,
        readyState: this.webSocket.readyState,
        binaryType: this.webSocket.binaryType,
        extensions: this.webSocket.extensions
      })
      /**
             * WebSocket connection was successfully established with signaling server.
             *
             * @event Signaling#wsConnectionSuccess
             * @type {Object}
             * @property {WebSocket} ws - WebSocket object which represents active connection.
             * @property {TransactionManager} tm - [TransactionManager](https://github.com/medooze/transaction-manager) object that simplify WebSocket commands.
             */
      this.emit(signalingEvents.connectionSuccess, { ws: this.webSocket, tm: this.transactionManager })
      return this.webSocket
    }

    return new Promise((resolve, reject) => {
      this.webSocket = new WebSocket(this.wsUrl)
      this.transactionManager = new TransactionManager(this.webSocket)
      this.webSocket.onopen = () => {
        logger.info('WebSocket opened')
        this.transactionManager.on('event', (evt) => {
          /**
                     * Passthrough of available Millicast broadcast events.
                     *
                     * Active - Fires when the live stream is, or has started broadcasting.
                     *
                     * Inactive - Fires when the stream has stopped broadcasting, but is still available.
                     *
                     * Stopped - Fires when the stream has stopped for a given reason.
                     *
                     * Vad - Fires when using multiplexed tracks for audio.
                     *
                     * Layers - Fires when there is an update of the state of the layers in a stream (when broadcasting with simulcast).
                     *
                     * Migrate - Fires when the server is having problems, is shutting down or when viewers need to move for load balancing purposes.
                     *
                     * Viewercount - Fires when the viewer count changes.
                     *
                     * Updated - when an active stream's tracks are updated
                     *
                     * More information here: {@link https://docs.dolby.io/streaming-apis/docs/web#broadcast-events}
                     *
                     * @event Signaling#broadcastEvent
                     * @type {Object}
                     * @property {String} type - In this case the type of this message is "event".
                     * @property {("active" | "inactive" | "stopped" | "vad" | "layers" | "migrate" | "viewercount" | "updated")} name - Event name.
                     * @property {Object} data - Custom event data.
                     */
          this.emit(signalingEvents.broadcastEvent, evt)
        })
        logger.info('Connected to server: ', this.webSocket.url)
        logger.debug('WebSocket value: ', {
          url: this.webSocket.url,
          protocol: this.webSocket.protocol,
          readyState: this.webSocket.readyState,
          binaryType: this.webSocket.binaryType,
          extensions: this.webSocket.extensions
        })
        this.emit(signalingEvents.connectionSuccess, { ws: this.webSocket, tm: this.transactionManager })
        resolve(this.webSocket)
      }
      this.webSocket.onerror = () => {
        logger.error('WebSocket not connected: ', this.webSocket.url)
        /**
                 * WebSocket connection failed with signaling server.
                 * Returns url of WebSocket
                 *
                 * @event Signaling#wsConnectionError
                 * @type {String}
                 */
        this.emit(signalingEvents.connectionError, this.webSocket.url)
        reject(this.webSocket.url)
      }
      this.webSocket.onclose = () => {
        this.webSocket = null
        this.transactionManager = null
        logger.info('Connection closed with Signaling Server.')
        /**
                 * WebSocket connection with signaling server was successfully closed.
                 *
                 * @event Signaling#wsConnectionClose
                 */
        this.emit(signalingEvents.connectionClose)
      }
    })
  }

  /**
     * Close WebSocket connection with Millicast server.
     * @example millicastSignaling.close()
     */
  close () {
    logger.info('Closing connection with Signaling Server.')
    this.webSocket?.close()
  }

  /**
     * Establish WebRTC connection with Millicast Server as Subscriber role.
     * @param {String} sdp - The SDP information created by your offer.
     * @param {SignalingSubscribeOptions} options - Signaling Subscribe Options.
     * @example const response = await millicastSignaling.subscribe(sdp)
     * @return {Promise<String>} Promise object which represents the SDP command response.
     */
  async subscribe (sdp, options, pinnedSourceId = null, excludedSourceIds = null) {
    logger.info('Starting subscription to streamName: ', this.streamName)
    logger.debug('Subcription local description: ', sdp)
    const optionsParsed = getSubscribeOptions(options, pinnedSourceId, excludedSourceIds)

    // Signaling server only recognizes 'AV1' and not 'AV1X'
    sdp = SdpParser.adaptCodecName(sdp, 'AV1X', VideoCodec.AV1)

    // default events
    const events = ['active', 'inactive', 'layers', 'viewercount', 'vad', 'updated', 'migrate', 'stopped']
    const data = { sdp, streamId: this.streamName, pinnedSourceId: optionsParsed.pinnedSourceId, excludedSourceIds: optionsParsed.excludedSourceIds, events }

    if (optionsParsed.vad) {
      data.vad = true
    }
    if (Array.isArray(optionsParsed.events)) {
      data.events = optionsParsed.events
    }
    if (optionsParsed.forcePlayoutDelay) {
      data.forcePlayoutDelay = optionsParsed.forcePlayoutDelay
    }
    if (optionsParsed.layer) {
      data.layer = optionsParsed.layer
    }

    if (optionsParsed.abrConfiguration) {
      data.abr = {
        initialBitrate: optionsParsed.abrConfiguration.metadata?.bitrate,
        strategy: optionsParsed.abrConfiguration.strategy
      }
    }

    if (optionsParsed.customKeys) {
      data.customKeys = optionsParsed.customKeys
    }
    if (optionsParsed.forceSmooth) {
      data.abr = {
        ...(data.abr || {}),
        forceSmooth: optionsParsed.forceSmooth
      }
    }

    try {
      if (optionsParsed.disableVideo && optionsParsed.disableAudio) {
        throw new Error('Not attempting to connect as video and audio are disabled')
      }
      await this.connect()
      logger.info('Sending view command', data)
      const result = await this.transactionManager.cmd('view', data)

      // Check if browser supports AV1X
      const AV1X = RTCRtpReceiver.getCapabilities?.('video')?.codecs?.find?.((codec) => codec.mimeType === 'video/AV1X')
      // Signaling server returns 'AV1'. If browser supports AV1X, we change it to AV1X
      result.sdp = AV1X ? SdpParser.adaptCodecName(result.sdp, VideoCodec.AV1, 'AV1X') : result.sdp

      logger.info('Command sent, subscriberId: ', result.subscriberId)
      logger.debug('Command result: ', result)
      this.serverId = result.subscriberId
      this.clusterId = result.clusterId
      this.streamViewId = result.streamViewId

      // Save for diagnostics
      Diagnostics.initStreamName(this.streamName)
      Diagnostics.initSubscriberId(this.serverId)
      Diagnostics.initStreamViewId(result.streamViewId)
      Diagnostics.setClusterId(this.clusterId)
      return result.sdp
    } catch (e) {
      logger.error('Error sending view command, error: ', e)
      throw e
    }
  }

  /**
     * Establish WebRTC connection with Millicast Server as Publisher role.
     * @param {String} sdp - The SDP information created by your offer.
     * @param {SignalingPublishOptions} options - Signaling Publish Options.
     * @example const response = await millicastSignaling.publish(sdp, {codec: 'h264'})
     * @return {Promise<String>} Promise object which represents the SDP command response.
     */
  async publish (sdp, options, record = null, sourceId = null) {
    const optionsParsed = getPublishOptions(options, record, sourceId)

    logger.info(`Starting publishing to streamName: ${this.streamName}, codec: ${optionsParsed.codec}`)
    logger.debug('Publishing local description: ', sdp)
    const supportedVideoCodecs = PeerConnection.getCapabilities?.('video')?.codecs?.map((cdc) => cdc.codec) ?? []

    const videoCodecs = Object.values(VideoCodec)
    if (videoCodecs.indexOf(optionsParsed.codec) === -1) {
      logger.error(`Invalid codec ${optionsParsed.codec}. Possible values are: `, videoCodecs)
      throw new Error(`Invalid codec ${optionsParsed.codec}. Possible values are: ${videoCodecs}`)
    }

    if (supportedVideoCodecs.length > 0 && supportedVideoCodecs.indexOf(optionsParsed.codec) === -1) {
      logger.error(`Unsupported codec ${optionsParsed.codec}. Possible values are: `, supportedVideoCodecs)
      throw new Error(`Unsupported codec ${optionsParsed.codec}. Possible values are: ${supportedVideoCodecs}`)
    }

    // Signaling server only recognizes 'AV1' and not 'AV1X'
    if (optionsParsed.codec === VideoCodec.AV1) {
      sdp = SdpParser.adaptCodecName(sdp, 'AV1X', VideoCodec.AV1)
    }

    const data = {
      name: this.streamName,
      sdp,
      codec: optionsParsed.codec,
      sourceId: optionsParsed.sourceId
    }

    if (optionsParsed.priority) {
      if (Number.isInteger(optionsParsed.priority) && optionsParsed.priority >= -2147483648 && optionsParsed.priority <= 2147483647) {
        data.priority = optionsParsed.priority
      } else {
        throw new Error('Invalid value for priority option. It should be a decimal integer between the range [-2^31, +2^31 - 1]')
      }
    }

    if (optionsParsed.record !== null) {
      data.record = optionsParsed.record
    }
    if (Array.isArray(optionsParsed.events)) {
      data.events = optionsParsed.events
    }
    try {
      if (optionsParsed.disableVideo && optionsParsed.disableAudio) {
        throw new Error('Not attempting to connect as video and audio are disabled')
      }
      await this.connect()
      logger.info('Sending publish command')
      const result = await this.transactionManager.cmd('publish', data)

      if (optionsParsed.codec === VideoCodec.AV1) {
        // If browser supports AV1X, we change from AV1 to AV1X
        const AV1X = RTCRtpSender.getCapabilities?.('video')?.codecs?.find?.((codec) => codec.mimeType === 'video/AV1X')
        result.sdp = AV1X ? SdpParser.adaptCodecName(result.sdp, VideoCodec.AV1, 'AV1X') : result.sdp
      }

      logger.info('Command sent, publisherId: ', result.publisherId)
      logger.debug('Command result: ', result)
      this.serverId = result.publisherId
      this.clusterId = result.clusterId

      // Save for diagnostics
      Diagnostics.initStreamName(this.streamName)
      Diagnostics.initSubscriberId(this.serverId)
      Diagnostics.initFeedId(result.feedId)
      Diagnostics.setClusterId(this.clusterId)
      return result.sdp
    } catch (e) {
      logger.error('Error sending publish command, error: ', e)
      throw e
    }
  }

  /**
     * Send command to the server.
     * @param {String} cmd - Command name.
     * @param {Object} [data] - Command parameters.
     * @return {Promise<Object>} Promise object which represents the command response.
     */
  async cmd (cmd, data) {
    logger.info(`Sending cmd: ${cmd}`)

    return this.transactionManager.cmd(cmd, data)
  }
}

const getSubscribeOptions = (options, legacyPinnedSourceId, legacyExcludedSourceIds) => {
  let parsedOptions = typeof options === 'object' ? options : {}
  if (Object.keys(parsedOptions).length === 0) {
    parsedOptions = {
      vad: options,
      pinnedSourceId: legacyPinnedSourceId,
      excludedSourceIds: legacyExcludedSourceIds
    }
  }
  return parsedOptions
}

const getPublishOptions = (options, legacyRecord, legacySourceId) => {
  let parsedOptions = typeof options === 'object' ? options : {}
  if (Object.keys(parsedOptions).length === 0) {
    const defaultCodec = VideoCodec.H264
    parsedOptions = {
      codec: options ?? defaultCodec,
      record: legacyRecord,
      sourceId: legacySourceId
    }
  }
  return parsedOptions
}