export interface ISSEOptions { headers?: Record; payload?: string; method?: string; withCredentials?: boolean; debug?: boolean; start?: boolean; } interface SSEEvent extends Event { id?: string; source?: SSE; readyState?: number; data?: string; } interface SSEEventListener { (e: SSEEvent): void; } type OnEvent = `on${string}`; type EventType = 'message' | 'error' | 'readystatechange' | 'abort' | 'open'; const FIELD_SEPARATOR = ':'; export default class SSE { public static INITIALIZING = -1; public static CONNECTING = 0; public static OPEN = 1; public static CLOSED = 2; private headers: Record; private payload: string; private method: string; private withCredentials: boolean; private debug: boolean; private listeners: Record = {}; private xhr: XMLHttpRequest | null = null; private readyState: number = SSE.INITIALIZING; private progress = 0; private chunk = ''; [key: OnEvent]: SSEEventListener | undefined; constructor(private url: string, options: ISSEOptions = {}) { this.headers = options.headers || {}; this.payload = options.payload !== undefined ? options.payload : ''; this.method = options.method || (this.payload ? 'POST' : 'GET'); this.withCredentials = !!options.withCredentials; this.debug = !!options.debug; if (options.start === undefined || options.start) { this.stream(); } } addEventListener(type: EventType, listener: SSEEventListener) { if (this.listeners[type] === undefined) { this.listeners[type] = []; } if (this.listeners[type].indexOf(listener) === -1) { this.listeners[type].push(listener); } } removeEventListener(type: EventType, listener: SSEEventListener) { if (this.listeners[type] === undefined) { return; } const filtered: SSEEventListener[] = []; this.listeners[type].forEach((element) => { if (element !== listener) { filtered.push(element); } }); if (filtered.length === 0) { delete this.listeners[type]; } else { this.listeners[type] = filtered; } } dispatchEvent(e: SSEEvent | null) { if (!e) { return true; } if (this.debug) { console.debug(e); } e.source = this; const onHandler: OnEvent = `on${e.type}`; if (this.hasOwnProperty(onHandler) && this[onHandler]) { this[onHandler].call(this, e); if (e.defaultPrevented) { return false; } } if (this.listeners[e.type]) { return this.listeners[e.type].every((callback) => { callback(e); return !e.defaultPrevented; }); } return true; } private _setReadyState(state: number) { const event: SSEEvent = new CustomEvent('readystatechange'); event.readyState = state; this.readyState = state; this.dispatchEvent(event); } private _onStreamFailure = (e: ProgressEvent) => { const event: SSEEvent = new CustomEvent('error'); if (e.currentTarget instanceof XMLHttpRequest) { event.data = e.currentTarget.response; } this.dispatchEvent(event); this.close(); } private _onStreamAbort = () => { this.dispatchEvent(new CustomEvent('abort')); this.close(); } private _onStreamProgress = (e: ProgressEvent) => { if (!this.xhr) { return; } if (this.xhr.status !== 200) { this._onStreamFailure(e); return; } if (this.readyState === SSE.CONNECTING) { this.dispatchEvent(new CustomEvent('open')); this._setReadyState(SSE.OPEN); } const data = this.xhr.responseText.substring(this.progress); this.progress += data.length; const parts = (this.chunk + data).split(/(\r\n|\r|\n){2}/g); // we assume that the last chunk can be incomplete because of buffering or other network effects // so we always save the last part to merge it with the next incoming packet const lastPart = parts.pop(); parts.forEach((part) => { if (part.trim().length > 0) { this.dispatchEvent(this._parseEventChunk(part)); } }); this.chunk = lastPart ?? ''; } private _onStreamLoaded = (e: ProgressEvent) => { this._onStreamProgress(e); // Parse the last chunk. this.dispatchEvent(this._parseEventChunk(this.chunk)); this.chunk = ''; } /** * Parse a received SSE event chunk into a constructed event object. * * Reference: https://html.spec.whatwg.org/multipage/server-sent-events.html#dispatchMessage */ private _parseEventChunk(chunk: string) { if (!chunk || chunk.length === 0) { return null; } if (this.debug) { console.debug(chunk); } const e: Record = { 'id': '', 'retry': '', 'data': '', 'event': 'message' }; chunk.split(/\n|\r\n|\r/).forEach((line) => { line = line.trimEnd(); const index = line.indexOf(FIELD_SEPARATOR); if (index <= 0) { // Line was either empty, or started with a separator and is a comment. // Either way, ignore. return; } const field = line.substring(0, index); if (!(field in e)) { return; } // only first whitespace should be trimmed const skip = (line[index + 1] === ' ') ? 2 : 1; const value = line.substring(index + skip); // consecutive 'data' is concatenated with newlines if (field === 'data' && e[field] !== null) { e['data'] += "\n" + value; } else { e[field] = value; } }); const event: SSEEvent = new CustomEvent(e.event); event.data = e.data || ''; event.id = e.id; return event; }; private _checkStreamClosed = () => { if (!this.xhr) { return; } if (this.xhr.readyState === XMLHttpRequest.DONE) { this._setReadyState(SSE.CLOSED); } }; /** * starts the streaming */ stream() { if (this.xhr) { // Already connected. return; } this._setReadyState(SSE.CONNECTING); this.xhr = new XMLHttpRequest(); this.xhr.addEventListener('progress', this._onStreamProgress); this.xhr.addEventListener('load', this._onStreamLoaded); this.xhr.addEventListener('readystatechange', this._checkStreamClosed); this.xhr.addEventListener('error', this._onStreamFailure); this.xhr.addEventListener('abort', this._onStreamAbort); this.xhr.open(this.method, this.url); for (const header in this.headers) { this.xhr.setRequestHeader(header, this.headers[header]); } this.xhr.withCredentials = this.withCredentials; this.xhr.send(this.payload); }; /** * closes the stream * @type Close */ close() { if (this.readyState === SSE.CLOSED) { return; } this.xhr?.abort(); this.xhr = null; this._setReadyState(SSE.CLOSED); }; }