import { Injectable } from "@angular/core";
import { io, Socket } from "socket.io-client";
import { Observable, Subscriber, share, filter, takeUntil, finalize } from "rxjs";
import { PUSH_UPDATES_ROUTE, CLIENT_EVENTS, CHANNELS, SERVER_EVENTS, PUB_TYPES, Publication } from "@zixi/models";
import { AuthService } from "src/app/services/auth.service";
import _ from "lodash";

export { CHANNELS, PUB_TYPES, Publication };
export const REFRESH_RATE_LIMIT = 5000;
interface Channel {
    name: CHANNELS;
    objectId: number | null;
}

@Injectable({
    providedIn: "root"
})
export class PushUpdatesService {
    private socket: Socket | null;
    private pushUpdates$: Observable<Publication> | null;
    private activeChannels: Channel[] = [];
    private readonly isLoggedOut$ = this.authService.isLoggedIn.pipe(filter(isLoggedIn => !isLoggedIn));

    constructor(private authService: AuthService) {}

    /**
     * Subscribe to a channel, and return an observable that emits publications from that channel.
     * In case the subscription fails, the observable will complete.
     * After all subscribers cancel their subscriptions to the stream,
     * the socket will emit to the server unsubscribe from the channel.
     */
    subscribeChannel(channel: Channel): Observable<Publication> {
        if (!this.socket) {
            this.init();
        }

        const isSubscriptionFail$ = new Observable(observer => {
            (this.socket as Socket).emit(SERVER_EVENTS.SUBSCRIBE, channel.name, channel.objectId, response => {
                if (response.success) {
                    this.addToActiveChannels(channel);
                } else {
                    observer.next();
                }
                observer.complete();
            });
        });

        return this.pushUpdates$.pipe(
            filter(pub =>
                pub.channel === channel.name && channel.objectId ? pub.objectId === channel.objectId : true
            ),
            finalize(() => {
                if (this.isActiveChannel(channel)) {
                    this.removeFromActiveChannels(channel);
                    if (this.socket && this.socket.connected) {
                        this.socket.emit(SERVER_EVENTS.UNSUBSCRIBE, channel.name, channel.objectId);
                    }
                }
            }),
            takeUntil(isSubscriptionFail$)
        );
    }

    /**
     * Initiate the socket connection.
     * The server emits publications from all the active channels to the push updates observable (multicast).
     * After all the observable subscribers unsubscribe,
     * the unsubscribe handler will execute and disconnect the socket connection.
     */
    private init() {
        this.socket = io("", {
            path: PUSH_UPDATES_ROUTE,
            withCredentials: true,
            transports: ["websocket"]
        });

        this.socket.io.on("reconnect", () => {
            this.activeChannels.forEach(activeChannel =>
                (this.socket as Socket).emit(SERVER_EVENTS.SUBSCRIBE, activeChannel.name, activeChannel.objectId)
            );
        });

        this.pushUpdates$ = new Observable((observer: Subscriber<Publication>) => {
            (this.socket as Socket).on(CLIENT_EVENTS.PUSH_UPDATE, (pub: Publication) => {
                observer.next(pub);
            });

            const unsubscribeHandler = () => {
                this.activeChannels = [];
                (this.socket as Socket).disconnect();
                this.socket = null;
                this.pushUpdates$ = null;
            };
            return unsubscribeHandler;
        }).pipe(takeUntil(this.isLoggedOut$), share());
    }

    private isActiveChannel(channel: Channel): boolean {
        return Boolean(this.activeChannels.find(activeChannel => _.isEqual(activeChannel, channel)));
    }

    private addToActiveChannels(channel: Channel): void {
        this.activeChannels.push(channel);
    }

    private removeFromActiveChannels(channel: Channel): void {
        _.remove(this.activeChannels, activeChannel => _.isEqual(activeChannel, channel));
    }
}
