import { Injectable } from '@angular/core';
import {
  asyncScheduler,
  MonoTypeOperatorFunction,
  Observable,
  ReplaySubject,
  Subject,
  Subscriber,
  Subscription
} from 'rxjs';
import { shareReplay, switchMap } from 'rxjs/operators';
import { UserAuthService } from '../../../../core/services/user-auth.service';
import { DataPriorityService } from './data-priority.service';
import { TranslateService } from '@ngx-translate/core';

/**
 * This registry collects all data sources with an individual id
 * and ensures, that they are only loaded once from the backend.
 * It is provided by the dashboard and is optionally injected by the data source implementations.
 */
@Injectable()
export class DataLoaderRegistryService {
  scheduler = asyncScheduler;

  private namedSubjects: { [id: string]: Subject<any> } = {};
  private registry: { [id: string]: Observable<any> } = {};
  private registryTimers: { [id: string]: Subscription } = {};
  private readonly priorityService: DataPriorityService;

  constructor(private authService: UserAuthService, private translationService: TranslateService) {
    this.priorityService = new DataPriorityService(authService.getMaxParallelRequests());
  }

  /**
   * By default the same data is not loaded more than once every 1 second
   */
  registerDataSource<T>(
    id: string,
    priority?: number,
    timeout = 1000
  ): MonoTypeOperatorFunction<T> {
    return (source: Observable<T>) =>
      new Observable((observer: Subscriber<T>) => {
        if (!this.registry[id]) {
          this.addId(
            id,
            source.pipe(
              this.priorityService.prioritize(id, priority),
              shareReplay({
                bufferSize: 1,
                refCount: true
              })
            )
          );
        }

        if (timeout) {
          this.ensureTimer(id, timeout);
        }

        return this.registry[id].subscribe({
          next: (value) => {
            observer.next(value);
          },
          error: (err) => {
            observer.error(err);
          },
          complete: () => {
            observer.complete();
          }
        });
      });
  }

  /**
   * Returns a named subject
   */
  getReplaySubject<T>(id: string): Subject<T> {
    if (this.namedSubjects[id]) {
      return this.namedSubjects[id];
    } else {
      this.namedSubjects[id] = new ReplaySubject<T>(1);
      return this.namedSubjects[id];
    }
  }

  /**
   * Returns a named subject of producers, where a producer can be published.
   */
  getProducersSubject<T>(id: string): Subject<Observable<T>> {
    return this.getReplaySubject<Observable<T>>(id);
  }

  /**
   * Returns the data of a producer
   */
  getProducer<T>(id: string): Observable<T> {
    return this.getProducersSubject<T>(id).pipe(switchMap((p) => p));
  }

  private addId(id: string, source: Observable<any>) {
    this.registry[id] = source;
  }

  private ensureTimer(id: string, timeout: number) {
    this.removeTimer(id);
    this.registryTimers[id] = this.scheduler.schedule(() => {
      this.registryTimers[id] = null;
      this.removeId(id);
    }, timeout);
  }

  private removeTimer(id: string) {
    if (this.registryTimers[id]) {
      this.registryTimers[id].unsubscribe();
      this.registryTimers[id] = null;
    }
  }

  private removeId(id: string) {
    if (id in this.registry) {
      delete this.registry[id];
    }
  }

  resetRegistry() {
    this.registry = {};
    this.namedSubjects = {};
    for (const key of Object.keys(this.registryTimers)) {
      this.removeTimer(key);
    }
  }
}
