import jwtDecode from 'jwt-decode'
import reemit from 're-emitter'
import { atob } from 'Base64'
import joi from 'joi'
import Logger from './Logger'
import BaseWebRTC from './utils/BaseWebRTC'
import Signaling, { signalingEvents } from './Signaling'
import { DOLBY_SDK_TIMESTAMP_UUID, VideoCodec } from './utils/Codecs'
import PeerConnection, { webRTCEvents, ConnectionType } from './PeerConnection'
import FetchError from './utils/FetchError'
import { supportsInsertableStreams, supportsRTCRtpScriptTransform } from './utils/StreamTransform'
import TransformWorker from './workers/TransformWorker.worker.js?worker&inline'
const logger = Logger.get('Publish')
const connectOptions = {
mediaStream: null,
bandwidth: 0,
metadata: false,
disableVideo: false,
disableAudio: false,
codec: VideoCodec.H264,
simulcast: false,
scalabilityMode: null,
peerConfig: {
autoInitStats: true,
statsIntervalMs: 1000
}
}
/**
* @class Publish
* @extends BaseWebRTC
* @classdesc Manages connection with a secure WebSocket path to signal the Millicast server
* and establishes a WebRTC connection to broadcast a MediaStream.
*
* Before you can broadcast, you will need:
*
* - [MediaStream](https://developer.mozilla.org/en-US/docs/Web/API/Media_Streams_API) which has at most one audio track and at most one video track. This will be used for stream the contained tracks.
*
* - A connection path that you can get from {@link Director} module or from your own implementation.
* @constructor
* @param {String} streamName - Deprecated: Millicast existing stream name.
* @param {tokenGeneratorCallback} tokenGenerator - Callback function executed when a new token is needed.
* @param {Boolean} [autoReconnect=true] - Enable auto reconnect to stream.
*/
export default class Publish extends BaseWebRTC {
constructor (streamName, tokenGenerator, autoReconnect = true) {
if (streamName) {
logger.warn('The streamName property has been deprecated. In a future release, this will be removed. Please do not rely on this value. Instead, set via token generator')
}
super(null, tokenGenerator, logger, autoReconnect)
}
/**
* Starts broadcast to an existing stream name.
*
* In the example, `getYourMediaStream` and `getYourPublisherConnection` is your own implementation.
* @param {Object} options - General broadcast options.
* @param {String} options.sourceId - Source unique id. Only avialable if stream is multisource.
* @param {Boolean} [options.stereo = false] - True to modify SDP for support stereo. Otherwise False.
* @param {Boolean} [options.dtx = false] - True to modify SDP for supporting dtx in opus. Otherwise False.
* @param {Boolean} [options.absCaptureTime = false] - True to modify SDP for supporting absolute capture time header extension. Otherwise False.
* @param {Boolean} [options.dependencyDescriptor = false] - True to modify SDP for supporting aom dependency descriptor header extension. Otherwise False.
* @param {MediaStream|Array<MediaStreamTrack>} options.mediaStream - MediaStream to offer in a stream. This object must have
* 1 audio track and 1 video track, or at least one of them. Alternative you can provide both tracks in an array.
* @param {Number} [options.bandwidth = 0] - Broadcast bandwidth. 0 for unlimited.
* @param {Boolean} [options.metadata = false] - Enable metadata insertion if stream is compatible.
* @param {Boolean} [options.disableVideo = false] - Disable the opportunity to send video stream.
* @param {Boolean} [options.disableAudio = false] - Disable the opportunity to send audio stream.
* @param {VideoCodec} [options.codec = 'h264'] - Codec for publish stream.
* @param {Boolean} [options.simulcast = false] - Enable simulcast. **Only available in Chromium based browsers and with H.264 or VP8 video codecs.**
* @param {String} [options.scalabilityMode = null] - Selected scalability mode. You can get the available capabilities using <a href="PeerConnection#.getCapabilities">PeerConnection.getCapabilities</a> method.
* **Only available in Google Chrome.**
* @param {RTCConfiguration} [options.peerConfig = null] - Options to configure the new RTCPeerConnection.
* @param {Boolean} [options.record = false ] - Enable stream recording. If record is not provided, use default Token configuration. **Only available in Tokens with recording enabled.**
* @param {Array<String>} [options.events = null] - Specify which events will be delivered by the server (any of "active" | "inactive" | "viewercount").*
* @param {Number} [options.priority = null] - When multiple ingest streams are provided by the customer, add the ability to specify a priority between all ingest streams. Decimal integer between the range [-2^31, +2^31 - 1]. For more information, visit [our documentation](https://docs.dolby.io/streaming-apis/docs/backup-publishing).
* @returns {Promise<void>} Promise object which resolves when the broadcast started successfully.
* @fires PeerConnection#connectionStateChange
* @fires Signaling#broadcastEvent
* @example await publish.connect(options)
* @example
* import Publish from '@millicast/sdk'
*
* //Define callback for generate new token
* const tokenGenerator = () => getYourPublisherConnection(token, streamName)
*
* //Create a new instance
* // streamName is not necessary in the constructor anymore, could be null or undefined
* const streamName = "My Millicast Stream Name"
* const millicastPublish = new Publish(streamName, tokenGenerator)
*
* //Get MediaStream
* const mediaStream = getYourMediaStream()
*
* //Options
* const broadcastOptions = {
* mediaStream: mediaStream
* }
*
* //Start broadcast
* try {
* await millicastPublish.connect(broadcastOptions)
* } catch (e) {
* console.log('Connection failed, handle error', e)
* }
*/
async connect (options = connectOptions) {
const schema = joi.object({
sourceId: joi.string(),
stereo: joi.boolean(),
dtx: joi.boolean(),
absCaptureTime: joi.boolean(),
dependencyDescriptor: joi.boolean(),
mediaStream: joi
.alternatives()
.try(
joi.array().items(joi.object()),
joi.object()
),
bandwidth: joi.number(),
metadata: joi.boolean(),
disableVideo: joi.boolean(),
disableAudio: joi.boolean(),
codec: joi.string().valid(...Object.values(VideoCodec)),
simulcast: joi.boolean(),
scalabilityMode: joi.string(),
peerConfig: joi.object(),
record: joi.boolean(),
events: joi.array().items(joi.string().valid('active', 'inactive', 'viewercount')),
priority: joi.number()
})
const { error, value } = schema.validate(options)
if (error) logger.warn(error, value)
this.options = { ...connectOptions, ...options, peerConfig: { ...connectOptions.peerConfig, ...options.peerConfig }, setSDPToPeer: false }
this.options.metadata =
this.options.metadata &&
this.options.codec === VideoCodec.H264 &&
!this.options.disableVideo
await this.initConnection({ migrate: false })
}
async reconnect (data) {
this.options.mediaStream = this.webRTCPeer?.getTracks() ?? this.options.mediaStream
super.reconnect(data)
}
async replaceConnection () {
logger.info('Migrating current connection')
this.options.mediaStream = this.webRTCPeer?.getTracks() ?? this.options.mediaStream
await this.initConnection({ migrate: true })
}
/**
* Initialize recording in an active stream and change the current record option.
*/
async record () {
if (this.recordingAvailable) {
this.options.record = true
await this.signaling?.cmd('record')
logger.info('Broadcaster start recording')
} else {
logger.error('Record not available')
}
}
/**
* Finalize recording in an active stream and change the current record option.
*/
async unrecord () {
if (this.recordingAvailable) {
this.options.record = false
await this.signaling?.cmd('unrecord')
logger.info('Broadcaster stop recording')
} else {
logger.error('Unrecord not available')
}
}
stop () {
super.stop()
this.worker?.terminate()
this.worker = null
}
async initConnection (data) {
logger.debug('Broadcast option values: ', this.options)
this.stopReconnection = false
let promises
if (!this.options.mediaStream) {
logger.error('Error while broadcasting. MediaStream required')
throw new Error('MediaStream required')
}
if (!data.migrate && this.isActive()) {
logger.warn('Broadcast currently working')
throw new Error('Broadcast currently working')
}
let publisherData
try {
publisherData = await this.tokenGenerator()
// Set the iceServers from the publish data into the peerConfig
this.options.peerConfig.iceServers = publisherData?.iceServers
this.options.peerConfig.encodedInsertableStreams = this.options.metadata
} catch (error) {
logger.error('Error generating token.')
if (error instanceof FetchError) {
if (error.status === 401 || !this.autoReconnect) {
// should not reconnect
this.stopReconnection = true
} else {
// should reconnect with exponential back off if autoReconnect is true
this.reconnect()
}
}
throw error
}
if (!publisherData) {
logger.error('Error while broadcasting. Publisher data required')
throw new Error('Publisher data required')
}
const decodedJWT = jwtDecode(publisherData.jwt)
this.streamName = decodedJWT.millicast.streamName
this.recordingAvailable = decodedJWT[atob('bWlsbGljYXN0')].record
if (this.options.record && !this.recordingAvailable) {
logger.error('Error while broadcasting. Record option detected but recording is not available')
throw new Error('Record option detected but recording is not available')
}
const signalingInstance = new Signaling({
streamName: this.streamName,
url: `${publisherData.urls[0]}?token=${publisherData.jwt}`
})
const webRTCPeerInstance = data.migrate ? new PeerConnection() : this.webRTCPeer
await webRTCPeerInstance.createRTCPeer(this.options.peerConfig, ConnectionType.Publisher)
// Stop emiting events from the previous instances
this.stopReemitingWebRTCPeerInstanceEvents?.()
this.stopReemitingSignalingInstanceEvents?.()
// And start emitting from the new ones
this.stopReemitingWebRTCPeerInstanceEvents = reemit(webRTCPeerInstance, this, [webRTCEvents.connectionStateChange])
this.stopReemitingSignalingInstanceEvents = reemit(signalingInstance, this, [signalingEvents.broadcastEvent])
const getLocalSDPPromise = webRTCPeerInstance.getRTCLocalSDP(this.options)
const signalingConnectPromise = signalingInstance.connect()
promises = await Promise.all([getLocalSDPPromise, signalingConnectPromise])
const localSdp = promises[0]
if (this.options.metadata) {
if (!this.worker) {
this.worker = new TransformWorker()
}
const senders = this.getRTCPeerConnection().getSenders()
senders.forEach(sender => {
if (supportsRTCRtpScriptTransform) {
// eslint-disable-next-line no-undef
sender.transform = new RTCRtpScriptTransform(this.worker, {
name: 'senderTransform',
codec: this.options.codec
})
} else if (supportsInsertableStreams) {
const { readable, writable } = sender.createEncodedStreams()
this.worker.postMessage({
action: 'insertable-streams-sender',
codec: this.options.codec,
readable,
writable
}, [readable, writable])
}
})
}
let oldSignaling = this.signaling
this.signaling = signalingInstance
const publishPromise = this.signaling.publish(localSdp, this.options)
const setLocalDescriptionPromise = webRTCPeerInstance.peer.setLocalDescription(webRTCPeerInstance.sessionDescription)
promises = await Promise.all([publishPromise, setLocalDescriptionPromise])
let remoteSdp = promises[0]
if (!this.options.disableVideo && this.options.bandwidth > 0) {
remoteSdp = webRTCPeerInstance.updateBandwidthRestriction(remoteSdp, this.options.bandwidth)
}
await webRTCPeerInstance.setRTCRemoteSDP(remoteSdp)
logger.info('Broadcasting to streamName: ', this.streamName)
let oldWebRTCPeer = this.webRTCPeer
this.webRTCPeer = webRTCPeerInstance
this.setReconnect()
if (data.migrate) {
this.webRTCPeer.on(webRTCEvents.connectionStateChange, (state) => {
if (['connected', 'disconnected', 'failed', 'closed'].includes(state)) {
oldSignaling?.close?.()
oldWebRTCPeer?.closeRTCPeer?.()
oldSignaling = oldWebRTCPeer = null
}
})
}
}
/**
* Send SEI user unregistered data as part of the frame being streamed. Only available for H.264 codec.
* @param {String | Object} message The data to be sent as SEI user unregistered data.
* @param {String} [uuid="d40e38ea-d419-4c62-94ed-20ac37b4e4fa"] String with UUID format as hex digit (XXXX-XX-XX-XX-XXXXXX).
*/
sendMetadata (message, uuid = DOLBY_SDK_TIMESTAMP_UUID) {
if (this.options?.metadata && this.worker) {
this.worker.postMessage({
action: 'metadata-sei-user-data-unregistered',
uuid,
payload: message
})
} else {
let warningMessage = 'Could not send metadata due to:'
if (this.options) {
if (!this.options.metadata) {
warningMessage += '\n- Metadata option is not enabled.'
if (this.options.codec !== VideoCodec.H264) {
warningMessage += '\n- Incompatible codec. Only H264 available.'
}
if (this.options.disableVideo) {
warningMessage += '\n- Video disabled.'
}
} else if (!this.worker) {
warningMessage += '\n- Stream not being published.'
}
} else {
warningMessage += '\n- Stream not being published.'
}
logger.warn(warningMessage)
}
}
};