import { Injectable } from "@angular/core";
import { HttpClient } from "@angular/common/http";
import { Observable, ReplaySubject } from "rxjs";
import { map, share } from "rxjs/operators";

import { TranslateService } from "@ngx-translate/core";
import { Constants } from "../../../../constants/constants";
import { KafkaConfig } from "../../../../models/shared";

import { AuthService } from "src/app/services/auth.service";

@Injectable({
    providedIn: "root"
})
export class KafkaConfigsService {
    kafkaConfigs: Observable<KafkaConfig[]>;
    private kafkaConfigs$: ReplaySubject<KafkaConfig[]>;
    private dataStore: {
        configs: KafkaConfig[];
    };

    constructor(private authService: AuthService, private http: HttpClient, private translate: TranslateService) {
        this.reset();

        this.authService.isLoggedIn.subscribe(isLoggedIn => {
            if (!isLoggedIn) this.reset();
        });
    }

    private reset() {
        this.dataStore = {
            configs: []
        };

        this.kafkaConfigs$ = new ReplaySubject(1) as ReplaySubject<KafkaConfig[]>;
        this.kafkaConfigs = this.kafkaConfigs$.asObservable();
    }

    private prepKafkaConfig(item: KafkaConfig) {
        return item;
    }

    private updateStore(kafkaConfig: KafkaConfig, merge: boolean): void {
        this.prepKafkaConfig(kafkaConfig);

        const currentIndex = this.dataStore.configs.findIndex(g => g.id === kafkaConfig.id);
        if (currentIndex === -1) {
            this.dataStore.configs.push(kafkaConfig);
            return;
        } else if (merge) {
            const current = this.dataStore.configs[currentIndex];

            Object.assign(current, kafkaConfig);

            const relationships = [];
            relationships.forEach(overwrite => {
                if (current[overwrite.id] == null) current[overwrite.obj] = null;
            });
        } else {
            this.dataStore.configs[currentIndex] = kafkaConfig;
        }
    }

    refreshKafkaConfigs(): Observable<KafkaConfig[]> {
        const kafkaConfigs = this.http
            .get<{ result: KafkaConfig[]; success: boolean }>(Constants.apiUrl + Constants.apiUrls.configuration.kafka)
            .pipe(share());

        kafkaConfigs.subscribe(
            data => {
                const kafkas = data.result;

                this.dataStore.configs.forEach((existing, existingIndex) => {
                    const newIndex = kafkas.findIndex(tp => tp.id === existing.id);
                    if (newIndex === -1) this.dataStore.configs.splice(existingIndex, 1);
                });

                kafkas.forEach(tp => this.updateStore(tp, true));

                this.kafkaConfigs$.next(Object.assign({}, this.dataStore).configs);
            },
            // eslint-disable-next-line no-console
            error => console.log(this.translate.instant("API_ERRORS.COULD_NOT_LOAD_TRANSCODING_PROFILES"), error)
        );

        return kafkaConfigs.pipe(map(r => r.result));
    }

    async testKafkaConfig(model: KafkaConfig): Promise<{ success: boolean; message: string }> {
        try {
            const result = await this.http
                .get<{ success: boolean; result?: number; error?: string }>(
                    Constants.apiUrl + Constants.apiUrls.configuration.kafka + "/" + model.id + "/test"
                )
                .toPromise();

            if (!result.success)
                return { success: false, message: result.error || "Failed testing Kafka configuration" };
            else return { success: true, message: "Test message sent successfully" };
        } catch (error) {
            return { success: false, message: error.error.error || "Failed testing Kafka configuration" };
        }
    }

    async createKafkaConfig(model: Record<string, unknown>): Promise<KafkaConfig | false> {
        try {
            const result = await this.http
                .post<{ success: boolean; result: KafkaConfig }>(
                    Constants.apiUrl + Constants.apiUrls.configuration.kafka,
                    model
                )
                .toPromise();

            const kafkaConfig: KafkaConfig = result.result;

            this.updateStore(kafkaConfig, false);

            this.kafkaConfigs$.next(Object.assign({}, this.dataStore).configs);
            return kafkaConfig;
        } catch (error) {
            return false;
        }
    }

    async updateKafkaConfig(kafka: KafkaConfig, model: Record<string, unknown>): Promise<KafkaConfig | false> {
        try {
            const result = await this.http
                .put<{ success: boolean; result: KafkaConfig }>(
                    Constants.apiUrl + Constants.apiUrls.configuration.kafka + "/" + kafka.id,
                    model
                )
                .toPromise();

            const kafkaConfig: KafkaConfig = result.result;

            this.updateStore(kafkaConfig, false);

            this.kafkaConfigs$.next(Object.assign({}, this.dataStore).configs);
            return kafkaConfig;
        } catch (error) {
            return false;
        }
    }

    async deleteKafkaConfig(kafkaConfig: KafkaConfig | number): Promise<boolean> {
        try {
            const result = await this.http
                .delete<{ success: boolean; result: number }>(
                    Constants.apiUrl +
                        Constants.apiUrls.configuration.kafka +
                        "/" +
                        `${typeof kafkaConfig === "number" ? kafkaConfig : kafkaConfig.id}`
                )
                .toPromise();

            const deletedId: number = result.result;

            const index = this.dataStore.configs.findIndex(tp => tp.id === deletedId);
            if (index !== -1) this.dataStore.configs.splice(index, 1);

            this.kafkaConfigs$.next(Object.assign({}, this.dataStore).configs);

            return true;
        } catch (error) {
            return false;
        }
    }
}
