import { HubConnectionBuilder, HubConnection } from "@microsoft/signalr";
import { MessagePackHubProtocol } from "@microsoft/signalr-protocol-msgpack";
import { Subscriber, Subscription, Stream } from "./message-hub.types";

export class MessageStreamHub<StreamName extends string> {
    private connection!: HubConnection;
    private streams = new Map<StreamName, Stream<unknown>>();
    // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
    private subscriptions = new Map<StreamName, Subscription<any>[]>();
    private isInitialized = false;

    constructor(private url: string, private useMessagePack: boolean = true) {
        this.buildConnection();
    }

    public async init() {
        try {
            await this.connection.start();
            this.isInitialized = true;
        } catch (e) {}
    }

    public async destroy() {
        this.disposeSubscriptions();
        this.subscriptions.clear();
        this.closeStreams();
        this.streams.clear();
        this.connection.stop();
    }

    public nextRetryDelayInMilliseconds() {
        return 2000;
    }

    public unsubscribe<T>(streamName: StreamName, subscriber: Subscriber<T>) {
        const subscriptions = this.subscriptions.get(streamName);
        if (!subscriptions) return;
        let index = 0;
        for (const subscription of subscriptions) {
            if (subscription.subscriber !== subscriber) {
                index++;
                continue;
            }
            subscription.disposer.dispose();
            subscriptions.splice(index, 1);
            break;
        }
    }

    public subscribe<T>(streamName: StreamName, subscriber: Subscriber<T>) {
        if (!this.isInitialized) {
            throw new Error("Initialize hub before subscribe!");
        }
        const stream = this.streams.get(streamName)!;
        const disposer = stream.subscribe(this.createSubscriberShape(subscriber));
        const subscription: Subscription<T> = {
            disposer,
            subscriber,
        };
        const existingSubs = this.subscriptions.get(streamName);
        this.subscriptions.set(streamName, existingSubs ? [...existingSubs, subscription] : [subscription]);
    }

    public createStream(streamName: StreamName, ...args: any[]) {
        let stream;
        if (args && args.length > 0) {
            stream = this.connection.stream(streamName, ...args);
        } else {
            stream = this.connection.stream(streamName, null);
        }
        this.streams.set(streamName, stream);
    }

    public getStream(streamName: StreamName) {
        return this.streams.get(streamName);
    }

    private reconnect = () => {
        for (const [streamName, subscription] of this.subscriptions) {
            this.createStream(streamName);
            const stream = this.streams.get(streamName)!;
            for (let data of subscription) {
                const disposer = stream.subscribe(this.createSubscriberShape(data.subscriber));
                data.disposer.dispose();
                data = {
                    disposer,
                    subscriber: data.subscriber,
                };
                data.subscriber.onReconnect?.();
            }
        }
    };

    private disposeSubscriptions() {
        this.subscriptions.forEach((subscriptions) => {
            subscriptions.forEach((sub) => sub.disposer.dispose());
        });
    }

    private closeStreams() {
        for (const [streamName] of this.streams) {
            this.connection.off(streamName);
        }
    }

    private buildConnection() {
        const connection = new HubConnectionBuilder().withUrl(this.url).withAutomaticReconnect(this);
        if (this.useMessagePack) connection.withHubProtocol(new MessagePackHubProtocol());
        this.connection = connection.build();
        this.connection.onreconnected(this.reconnect);
    }

    private createSubscriberShape<T>(subscriber: Subscriber<T>) {
        return {
            next: subscriber.onMessage.bind(subscriber),
            error: console.error,
            complete: console.info,
        };
    }
}
