import { Injectable } from '@angular/core';
import { Store } from '@ngrx/store';
import { Summary } from '@octiga/microsoft-events/src/interfaces/summary.interface';
import { combineLatest, interval, Subject } from 'rxjs';
import { buffer, filter, first, map, mergeMap, take, tap } from 'rxjs/operators';
import { client } from 'src/app/stores/client';
import { upsertAlerts } from '../stores/client/alert/actions';
import { AlertItem } from '../stores/client/alert/model';
import { upsertSummarys } from '../stores/client/etl/summary/actions';
import { upsertUpdateDateLists } from '../stores/client/etl/update-date-list/actions';
import { BrotliService } from './brotli.service';
import { TokenService } from './token.service';

@Injectable({
    providedIn: 'root',
})
export class WebsocketService {
    private summary_update$ = new Subject<Summary>();
    private alert_update$ = new Subject<AlertItem>();

    constructor(
        private store: Store,
        private brotliService: BrotliService,
        private tokenService: TokenService,
    ) {}

    public init() {
        this.subscribers();
        this.connect();
    }

    private connect() {
        this.tokenService
            .getToken()
            .pipe(first((token) => token != null))
            .subscribe((token) => this.handlers(token));
    }

    private handlers(token?: string) {
        const ws = new WebSocket(
            `${window.location.protocol === 'http:' ? 'ws' : 'wss'}://${window.location.host}/wss?token=${token}`,
        );

        ws.onopen = (event) => {
            console.log('[wss] open');
        };

        ws.onmessage = (event) => {
            const message = JSON.parse(event.data);
            if (message.type === 'summary') {
                this.summary_update$.next(message.data);
            } else if (message.type === 'alert') {
                this.alert_update$.next(message.data);
            }
        };

        ws.onerror = (event) => {
            console.log('[wss] error:', event);
        };

        ws.onclose = (event) => {
            console.log('[wss] closed:', event);
            setTimeout(() => this.handlers(token), 3000);
        };
    }

    private decompress() {
        return mergeMap((data: any) =>
            combineLatest(data.map((item) => this.brotliService.decompress(item.data))).pipe(
                map((results) => data.map((item, i) => ({ ...item, data: results[i] })) as Summary[]),
            ),
        );
    }

    private subscribers() {
        this.summary_update$
            .pipe(
                buffer(interval(5000)),
                filter((i) => i.length > 0),
                this.decompress(),
                tap((buff) => console.log(`[wss] upserting ${buff.length} summarys`)),
                map((summaries) => {
                    return summaries.reduce((acc, item) => {
                        if (acc.has(item.tenant)) {
                            acc.get(item.tenant).push(item);
                        } else {
                            acc.set(item.tenant, [item]);
                        }
                        return acc;
                    }, new Map<string, Summary[]>());
                }),
            )
            .subscribe((summaries) => {
                for (const [_tenant, summarys] of summaries) {
                    this.store.dispatch(upsertSummarys({ _tenant, summarys }));

                    // TODO: refactor this system away by using summary 'status' field instead
                    this.store
                        .select(client(_tenant).summary.updateDateList.all)
                        .pipe(take(1))
                        .subscribe((datelists) => {
                            const filteredDateLists = datelists
                                .filter((datelist) => summarys.some((s) => s.start === datelist.start))
                                .map((fdl) => ({ ...fdl, updated: true }));
                            if (filteredDateLists.length > 0) {
                                this.store.dispatch(
                                    upsertUpdateDateLists({ _tenant, updateDateLists: filteredDateLists }),
                                );
                            }
                        });
                }
            });

        this.alert_update$
            .pipe(
                buffer(interval(10000)),
                filter((i) => i.length > 0),
                tap((buff) => console.log(`[wss] upserting ${buff.length} alerts`)),
                map((alerts) => {
                    return alerts.reduce((acc, item) => {
                        if (acc.has(item.tenant_id)) {
                            acc.get(item.tenant_id).push(item);
                        } else {
                            acc.set(item.tenant_id, [item]);
                        }
                        return acc;
                    }, new Map<string, AlertItem[]>());
                }),
            )
            .subscribe((items) => {
                for (const [_tenant, alerts] of items) {
                    this.store.dispatch(upsertAlerts({ _tenant, alerts }));
                }
            });
    }
}
