import { clamp } from 'lodash-es';
import { MonoTypeOperatorFunction, Observable, Subscriber, Subscription } from 'rxjs';

interface PriorityQueue {
  id: string;
  priority: number;
  inWork: boolean;
  source: Observable<any>;
  observer: Subscriber<any>;
  subscription: Subscription;
}

export class DataPriorityService {
  static readonly defaultPriority = 1000;

  private readonly queueSize: number;

  get inWorkQueue(): number {
    return this._inWorkQueue;
  }

  set inWorkQueue(value: number) {
    this._inWorkQueue = clamp(value, 0, this.queueSize);
  }

  private _inWorkQueue = 0;

  get priorityQueue(): Map<string, PriorityQueue> {
    return this._priorityQueue;
  }

  private _priorityQueue = new Map<string, PriorityQueue>();

  constructor(queueSize?: number) {
    this.queueSize = queueSize ?? 10;
  }

  prioritize<T>(
    id: string,
    priority = DataPriorityService.defaultPriority
  ): MonoTypeOperatorFunction<T> {
    return (source: Observable<T>) =>
      new Observable((observer: Subscriber<T>) => {
        const queueItem: PriorityQueue = {
          id,
          priority,
          inWork: false,
          source,
          observer,
          subscription: null
        };
        this.register(queueItem);
        this.getNext();
        return () => {
          queueItem.subscription?.unsubscribe();
          this.unregister(id);
        };
      });
  }

  getQueueCapacity(): number {
    return this.queueSize - this.inWorkQueue;
  }

  private register(queueItem: PriorityQueue): void {
    if (this._priorityQueue.has(queueItem.id)) {
      return;
    }
    this._priorityQueue.set(queueItem.id, queueItem);
  }

  private unregister(id): void {
    if (this._priorityQueue.has(id)) {
      this._priorityQueue.delete(id);
      this.inWorkQueue -= 1;
    }
  }

  private setInWorkingState(state: boolean, id: string): void {
    const updated = this._priorityQueue.get(id);
    updated.inWork = state;
    this._priorityQueue.set(id, updated);
  }

  private getNext(): void {
    // returning, nothing in the queue
    if (!this._priorityQueue.size && !this.inWorkQueue) {
      return;
    }

    // returning, queue is full
    if (this.inWorkQueue >= this.queueSize || this.getQueueCapacity() <= 0) {
      return;
    }

    for (let i = 0; i < this.getQueueCapacity(); i++) {
      const next = this.nextHighestPriorityItem();
      if (!next) {
        break;
      }
      this.setInWorkingState(true, next.id);
      next.subscription = this.subscribeToNext(next);
      this.inWorkQueue += 1;
    }
  }

  private nextHighestPriorityItem(): PriorityQueue {
    const asArray: PriorityQueue[] = [];
    this._priorityQueue.forEach((priority) => {
      asArray.push(priority);
    });
    return asArray
      .filter((queueItem) => queueItem.inWork === false)
      .sort((a, b) => (a.priority < b.priority ? -1 : 1))
      .shift();
  }

  private subscribeToNext(queueItem: PriorityQueue): Subscription {
    let isPending = true;
    return queueItem.source.subscribe({
      next: (value) => {
        this.unregister(queueItem.id);
        queueItem.observer.next(value);
        if (isPending) {
          this.getNext();
        }
        isPending = false;
      },
      error: (err) => {
        this.unregister(queueItem.id);
        queueItem.observer.error(err);
        this.getNext();
      },
      complete: () => {
        queueItem.observer.complete();
      }
    });
  }
}
