import { forwardRef, Inject, Injectable } from '@angular/core';
import { each as _each, flatten as _flatten, Dictionary } from 'underscore';
import { Observable as rxjsObservable, of } from 'rxjs';
import { switchMap, map, scan, startWith } from 'rxjs/operators';

import { isString } from 'rev-shared/util';

import { Deferred } from 'rev-shared/util/Deferred';
import { IHubProxy } from './IHubProxy';
import { IPushObservableMessage } from './IPushObservableMessage';
import { IUnsubscribe } from './IUnsubscribe';
import { MessageHandler } from './MessageHandler';
import { Observable } from './Observable';
import { PushHubToken } from './PushHubToken';
import { SignalRHubsConnection, SignalRHubsConnectionState } from './SignalRHubsConnection';
import { SignalRHubsConnectionToken } from './SignalRHubsConnectionToken';

interface IAwaitMessageOptions {
	events: string|string[];
	rejectEvents?: string|string[];
	route: string;
	routeScope?: string;
	timeout?: number;
}

interface IMessage {
	content: any;
	type: string;
}

interface IBatchMessage {
	content: IMessage;
	type: string;
}

interface ISubscriptionPromise<T> extends Promise<T> {
	subscribed: Promise<void>;
}

interface IPushReducer<TState> {
	(state: TState, messageData: any): TState;
}
interface IPushReducers<TState> {
	[key: string]: IPushReducer<TState>;
}

interface IListeners {
	[key: string]: Observable;
}

@Injectable({
	providedIn: 'root'
})
export class PushBus {
	private routeListeners: IListeners;

	//private PushHub: any;
	//private hubsConnection: any;

	constructor(
		@Inject(forwardRef(() => PushHubToken)) private PushHub: IHubProxy,
		@Inject(forwardRef(() => SignalRHubsConnectionToken)) private hubsConnection: SignalRHubsConnection
	) {
		this.routeListeners = {};

		this.configureInboundMessageListener();
	}

	/*
	 * Subscribes to the provided route, and waits for the message to arrive
	 * returns a promise that resolves with the message
	 * options:{
	 *   route, routeScope,
	 *   timeout: milliseconds. Promise will be rejected if no event comes in within the time limit
	 *   events: messages to wait for,
	 *   rejectEvents: messages that indicate a failure. cause result promise to be rejected
	 * }
	 *
	 * returns promise resolved with:
	 *	eventType: the name of the event received
	 *	message: body of the message
	*/
	public awaitMessage(opts: IAwaitMessageOptions): ISubscriptionPromise<any> {
		return this.awaitMessageInternal(this.getQualifiedRoute(opts.route, opts.routeScope), opts.timeout, opts.events, opts.rejectEvents);
	}

	// wraps unsubscribe from multiple routes into single unsubscribe function
	public composeUnsubscribeFn(unsubscribeFns: Array<IUnsubscribe | (() => void)>): IUnsubscribe {

		const unsubscribe: IUnsubscribe = () => Promise.all(unsubscribeFns.map(fn => fn?.())) as Promise<any>;

		unsubscribe.subscribed = Promise.resolve();

		return unsubscribe;
	}

	public resubscribe(): void {
		console.log('Resubscribing push routes');

		_each(this.routeListeners,
			(listener: any, route: string) =>
				this.subscribeRoute(route)
					.catch((error: any) => console.error('Error resubscribing to route: ', route, error))
		);
	}

	public subscribe(route: string, handlers: Dictionary<MessageHandler>): IUnsubscribe;
	public subscribe(route: string, scope: string, handlers: Dictionary<MessageHandler>, autoSubscribed?: boolean): IUnsubscribe;

	/**
	 * route, routescope - used to build the qualified route being subscribed to.
	 * handlers: object containing message handlers
	 * autoSubscribed: only used for commandId - where the backend is responsible for initiating the subscription
	 */
	public subscribe(route: string, scopeOrHandlers: string | Dictionary<MessageHandler>, handlers?: Dictionary<MessageHandler>, autoSubscribed?: boolean): IUnsubscribe {
		if(!route) {
			throw new Error('Missing route on subscription');
		}

		if(arguments.length === 2) {
			return this.subscribeInternal(route, scopeOrHandlers as Dictionary<MessageHandler>);
		}

		const fullRoute = this.getQualifiedRoute(route, scopeOrHandlers as string);

		return this.subscribeInternal(fullRoute, handlers, autoSubscribed);
	}

	public getRxJsObservable(route: string, scope: string, types: string[]): rxjsObservable<IPushObservableMessage> {

		return new rxjsObservable<IPushObservableMessage>(subscriber =>
			this.subscribe(route, scope, types.reduce((handlers, type) => {
				handlers[type] = data => subscriber.next({ data, type });
				return handlers;
			}, {})));
	}

	public getObservable(route: string, scope: string, handlers: Dictionary<(data) => rxjsObservable<any>>): rxjsObservable<IPushObservableMessage> {
		if(!route) {
			throw new Error('missing route');
		}

		return this.getRxJsObservable(route, scope, Object.keys(handlers)).pipe(
			switchMap(msg => handlers[msg.type](msg.data).pipe(
				map(data => ({ type: msg.type, data }))
			))
		);
	}

	//This allows a redux-style binding to a push route. Each push handler maps (state, message) to the next state.
	public scanPushObservable<TState>(route: string, scope: string, handlers: IPushReducers<TState>, initial: TState): rxjsObservable<TState> {

		const pushMap = Object.keys(handlers).reduce((pushMaps, k) => ({ ...pushMaps, [k]: of }), {});

		return this.getObservable(route, scope, pushMap).pipe(
			scan((state: TState, msg: any) => handlers[msg.type](state, msg.data), initial),
			startWith(initial)
		);
	}

	public unsubscribe(route: string, handlers: Dictionary<MessageHandler>, suppressLog: boolean): Promise<void> {
		const listener = this.routeListeners[route];
		if(!listener) {
			console.log('Unsubscribe, not subcribed: ', + route);
			return Promise.resolve();
		}

		_each(handlers, (handler, messageType) => listener.off(messageType, handler));

		return this.unsubscribeRouteIfUnused(route, suppressLog);
	}

	private addMessageHandlers(route: string, handlers: any): void {
		const messageTypes: string[] = Object.keys(handlers);

		if(!messageTypes || !messageTypes.length) {
			throw new Error('At least one handler is required');
		}

		const listener = this.routeListeners[route];

		_each(messageTypes, (messageType: string) => listener.on(messageType, handlers[messageType]));
	}

	private awaitMessageInternal(route: string, timeout: number, events: string|string[], rejectEvents: string|string[]): ISubscriptionPromise<any> {
		const deferred: Deferred = new Deferred();
		let isComplete: boolean = false;

		const handlers = {} as Dictionary<MessageHandler>;

		_flatten([events]).forEach((event: string): void => {
			handlers[event] = (message: any) => {
				resolve({
					event,
					message
				}, true);
			};
		});

		_flatten([rejectEvents]).forEach((event: string): void => {
			handlers[event] = (message: any) => {
				resolve({
					event,
					message
				}, false);
			};
		});

		if(timeout) {
			window.setTimeout(resolve, timeout);
		}

		const unsubscribe = this.subscribeInternal(route, handlers);
		const promise = deferred.promise as ISubscriptionPromise<any>;
		promise.subscribed = unsubscribe.subscribed;

		return promise;

		function resolve(result: any, success: boolean) {
			if(!isComplete) {
				isComplete = true;
				unsubscribe();

				if(success) {
					deferred.resolve(result);
				} else {
					deferred.reject(result);
				}
			}
		}
	}

	private configureInboundMessageListener(): void {
		this.PushHub.client.on({
			routeMessage: (route: string, messageType: string, message: string): void =>
				this.routeMessageInternal(route, messageType, message),

			routeBatchMessages: (route: string, messages: any) => {
				const listener: any = this.routeListeners[route];
				const parsedMessages: IBatchMessage[] = this.parseMessage(messages, route);
				messages.forEach((message: IBatchMessage) => this.routeMessage(route, message.type, message.content, listener));
			}
		});
	}

	private ensureSubscribed(route: string, autoSubscribed: boolean): Promise<void> {
		if (!route) {
			throw new Error('Route is required');
		}

		let promise: Promise<void>;

		if (!this.routeListeners[route]) {
			if(!autoSubscribed) {
				promise = this.subscribeRoute(route);
			}

			this.routeListeners[route] = new Observable();
		}

		return promise || Promise.resolve();
	}

	private getQualifiedRoute(route: string, scope: string): string {
		if (scope) {
			return route + ':' + scope;
		}

		return route;
	}

	private parseMessage(messageContent: string, logTag: string): any {
		let message: any;

		try {
			message = JSON.parse(messageContent);
		} catch(e) {
			console.log('Unable to parse message: ', logTag);
			message = null;
		}

		return message;
	}

	private routeMessage(route: string, messageType: string, message: IMessage, listener: any): void {
		let suppress: boolean = false;

		if (messageType === 'CommandFinished') {
			if (message && message.type === 'MessageScheduled' || message.type === 'SessionTimeoutExtended') {
				suppress = true;
			}
		}

		if (!suppress) {
			console.log('Inbound message: ', route, messageType, message);
		}

		//TODO: remove 2nd message param.
		listener.fire(messageType, message, message);
	}

	private routeMessageInternal(route: string, messageType: string, messageJson: string): void {
		const listener: any = this.routeListeners[route];

		if (listener && listener.hasSubscribers(messageType)) {
			const messageContent = this.parseMessage(messageJson, route + ' ' + messageType);
			this.routeMessage(route, messageType, messageContent, listener);
		} else if(messageType !== 'CommandFinished') {
			console.warn('Unhandled Inbound push message: ', route, messageType);
		}
	}

	private subscribeInternal(route: string, handlers: Dictionary<MessageHandler>, autoSubscribed: boolean = false): IUnsubscribe {
		const subscribed = this.ensureSubscribed(route, autoSubscribed);

		this.addMessageHandlers(route, handlers);

		const unsubscribe = (() => this.unsubscribe(route, handlers, false)) as IUnsubscribe;
		unsubscribe.subscribed = subscribed;

		return unsubscribe;
	}

	private subscribeRoute(route: string): Promise<void> {
		if (this.hubsConnection.getConnectionStatus() === SignalRHubsConnectionState.Connected) {
			console.log('Subscribe to route: ', route);

			return this.PushHub.server.subscribe(route)
				.catch((error: any): Promise<never> => {
					console.error('Error subscribing to route: ', route, error);

					return Promise.reject(error);
				});
		}
	}

	private unsubscribeRoute(route: string, suppressLog: boolean): Promise<void> {
		if(!suppressLog) {
			console.log('Unsubscribe Route: ', route);
		}
		return this.PushHub.server.unsubscribe(route)
			.catch((error: any): Promise<never> => {
				console.error('Error unsubscribing from route: ', route, error);

				return Promise.reject(error);
			});
	}

	private unsubscribeRouteIfUnused(route: string, suppressLog: boolean): Promise<void> {
		const listener = this.routeListeners[route];

		if (!listener.hasSubscribers()) {
			delete this.routeListeners[route];

			if(this.hubsConnection.getConnectionStatus() === SignalRHubsConnectionState.Connected) {
				return this.unsubscribeRoute(route, suppressLog);
			}
		}

		return Promise.resolve();
	}
}
