/* eslint-disable no-param-reassign */
import { uniqueId } from "lodash";
import { getPRPC, GetPRPC } from "../../hooks/usePRPC";
import createRPCQuery from "../../utils/createRPCQuery.util";
import createLogger from "../../utils/logger.util";

type SubscribeRequestFn<T> = (prpc: GetPRPC) => Promise<T>;

interface BaseSubscribeResponse {
	unsubscribe: () => Promise<void>;
	update: (...args: any[]) => Promise<void>;
}

interface Subscription<
	TResponse extends BaseSubscribeResponse = BaseSubscribeResponse,
> {
	/**
	 * Unique id of the subscription.
	 */
	id: string;

	/**
	 * Name of the subscription.
	 */
	name?: string;

	/**
	 * Additional options for the subscription.
	 */
	options?: SubscriptionAddOptions;

	/**
	 * Session id of the prpc session.
	 */
	sessionId: string;

	/**
	 * Creation timestamp.
	 */
	timestamp: number;

	/**
	 * Last reconnection timestamp.
	 */
	reconnectedAt?: number;

	/**
	 * Number of times the subscription has been reconnected.
	 */
	reconnectedTimes: number;

	/**
	 * The request function to subscribe to.
	 */
	requestFunction: (prpc: GetPRPC) => Promise<TResponse>;

	/**
	 * Actual response of the request function.
	 */
	requestFunctionResponse: TResponse;
}

interface SubscriptionAddOptions {
	name?: string;
	metadata?: Record<string, any>;

	/**
	 * Whether to throw an error if the prpc session is not available.
	 * @default false
	 */
	throwOnError?: boolean;
}

const prepareResponse = async (handler: Promise<any>) => {
	const response = await handler;
	if ("status" in response && "data" in response) {
		return createRPCQuery(() => response);
	}
	return response;
};

/**
 * Subscription pool used to manage subscriptions.
 *
 * Stores all subscriptions and reconnects them when the connection is lost.
 * Also wraps subscriptions to automatically use a new PRPC session.
 *
 * @example
 * ```ts
 * const subscription = await SubscriptionPool.add((prpc) => prpc.theirsModel.example.subscribe({...});
 * // Subscription initialized and ready to use
 *
 * // Later, when the connection is lost
 * await SubscriptionPool.onReconnect(newPrpcSession); // Reconnects all subscriptions
 *
 * await subscription.update(...); // Updates the new subscription with the new prpc session
 * await subscription.unsubscribe(); // Unsubscribes from the subscription and removes it from the pool
 * ```
 */
class SubscriptionPool {
	private static logger = createLogger({ name: "SubscriptionPool" });

	/**
	 * List of subscriptions that are currently active.
	 */
	public static subscriptions: Subscription[] = [];

	/**
	 * Adds a subscription to the pool.
	 * @param request The request function to subscribe to.
	 * @param name The name of the subscription. Used for debugging purposes.
	 * @returns The wrapped subscribe's response or null if the prpc session is not available.
	 */
	public static async add<T extends BaseSubscribeResponse>(
		request: SubscribeRequestFn<T>,
		options: SubscriptionAddOptions = {},
	) {
		const currentPrpcSession = getPRPC();

		if (!currentPrpcSession) {
			throw new Error(
				"PRPC Session is not available. Seems like you are not connected to the server.",
			);
		}

		try {
			const response = await prepareResponse(request(currentPrpcSession));
			const id = uniqueId("subscription-");
			const { sessionId } = currentPrpcSession;

			this.logger.info(
				`Add subscription to pool '${id}', sessionId=${sessionId}`,
			);

			// Subscribed successfully, add to the list of subscriptions
			// Wrap the unsubscribe and other functions to avoid errors from the old prpc session

			const subscription: Subscription = {
				id,
				sessionId,
				name: options?.name,
				options,
				reconnectedTimes: 0,
				timestamp: Date.now(),
				requestFunction: request,
				requestFunctionResponse: response,
			};

			this.subscriptions.push(subscription);

			return this.wrapSubscription(subscription, sessionId);
		} catch (error) {
			this.logger.error("Failed to add subscription to pool", error);
			throw error;
		}
	}

	/**
	 * Starts monitoring the active subscriptions at a specified interval.
	 * @param ms The interval in milliseconds. Default is 10,000 milliseconds (10 seconds).
	 */
	public static startMonitor(ms = 10_000) {
		setInterval(() => {
			this.logger.info(
				`Active subscriptions (${this.subscriptions.length}):`,
				this.subscriptions.map((item) => ({
					id: item.id,
					name: item.name,
					options: item.options,
					sessionId: item.sessionId,
					timestamp: item.timestamp,
					reconnectedAt: item.reconnectedAt,
					reconnectedTimes: item.reconnectedTimes,
				})),
			);
		}, ms);
	}

	/**
	 * Clears all subscriptions in the pool.
	 */
	public static async clear() {
		this.logger.info(
			"Subscriptions clearing, count:",
			this.subscriptions.length,
		);
		this.subscriptions = [];
	}

	/**
	 * Reconnects all subscriptions in the pool.
	 * @param prpc New prpc session object.
	 */
	public static async onReconnect(prpc: GetPRPC) {
		this.logger.info(
			`Received reconnect event (subscriptions=${this.subscriptions.length}), new session id:`,
			prpc.sessionId,
		);
		const promise = Promise.all(
			this.subscriptions.map(async (subscription) => {
				// Update the prpc session id first (to avoid errors when calling the unsubscribe function)
				subscription.sessionId = prpc.sessionId;

				const response = await subscription.requestFunction(prpc);
				// Now we have a new prpc session, the old functions no longer work
				// and it will be reassigned to the new session, thus restoring the subscription functions.
				subscription.requestFunctionResponse = response;
				subscription.reconnectedAt = Date.now();
				subscription.reconnectedTimes++;
			}),
		);

		return promise
			.catch((error) => this.logger.error("Failed to reconnect", error))
			.then(() => {
				this.logger.info(
					`Reconnected subscriptions (${this.subscriptions.length}):`,
					this.subscriptions,
				);
			});
	}

	/**
	 * Wraps a subscription object and adds additional functions to it based on the response object.
	 * @param subscription - The subscription object to wrap.
	 * @returns The wrapped subscription object with additional functions.
	 */
	protected static wrapSubscription<T extends BaseSubscribeResponse>(
		subscription: Subscription<T>,
		sessionId: string,
	): T {
		const response = { ...subscription.requestFunctionResponse };
		const functions = Object.keys(response).reduce((acc, key) => {
			const value = response[key];
			if (typeof value !== "function") return acc;
			if (key === "unsubscribe") {
				acc[key] = this.wrapUnsubscribeFunction(
					subscription.id,
					sessionId,
				);
			} else {
				acc[key] = this.wrapAnyFunction(subscription, key);
			}
			return acc;
		}, response as any);

		return {
			...response,
			...functions,
		};
	}

	/**
	 * Wraps the unsubscribe function for a subscription with the given ID.
	 * @param id - The ID of the subscription.
	 * @returns The wrapped unsubscribe function.
	 */
	protected static wrapUnsubscribeFunction(id: string, sessionId: string) {
		return () => {
			const index = this.subscriptions.findIndex((s) => s.id === id);
			const subscription = this.subscriptions[index];
			if (index !== -1) {
				this.logger.info(
					"Unsubscribing from subscription",
					subscription,
				);
				this.subscriptions.splice(index, 1);

				// Unsubscribe from the subscription only if previous session & current session are the same
				// Otherwise, just ignore it because it's already unsubscribed
				if (sessionId === subscription.sessionId) {
					return subscription.requestFunctionResponse
						.unsubscribe()
						.catch((error) => {
							if (subscription?.options?.throwOnError)
								throw error;
							this.logger.warn(
								`Failed to unsubscribe from subscription (${id})`,
								error,
							);
						})
						.finally(() => {
							this.logger.info(
								`Successfully unsubscribed from subscription (${id})`,
							);
						});
				}
			}
			return null;
		};
	}

	/**
	 * Wraps a subscription function and returns a new function that can be called with any arguments.
	 * If the wrapped function is not a function, it returns the original value.
	 * @param subscription - The subscription object.
	 * @param key - The key of the function to wrap.
	 * @returns A new function that can be called with any arguments.
	 */
	protected static wrapAnyFunction<T extends BaseSubscribeResponse>(
		subscription: Subscription<T>,
		key: string,
	) {
		return (...args: any[]) => {
			const { requestFunctionResponse } = subscription;
			const value = requestFunctionResponse[key];
			if (typeof value !== "function") return value;
			return value(...args);
		};
	}
}

SubscriptionPool.startMonitor();

export default SubscriptionPool;
