import { OnDestroy } from '@angular/core';
import { BehaviorSubject, combineLatest, Observable, Subscription } from 'rxjs';
import { finalize, take } from 'rxjs/operators';
import { Cleanupable } from '../../classes';
import {
  IStreamingLib,
  IStreamingLibService,
  IStreamingParticipant,
  SessionBase,
  TwilioTokenResponse,
} from '../../interfaces';
import {
  AudioStreamService,
  VideoConstraints,
  VideoStream,
  VideoStreamService,
} from '../../media';
import { AudioStream } from '../../media/interfaces/audio-stream.interface';
import { NetworkQualityService } from '../network-quality/network-quality.service';
import { SessionBaseService } from '../session/session-base.service';
import { SessionConfigService } from '../session/session-config.service';

export abstract class BaseStreamingService<
  SubjectSessionClass extends SessionBase
> extends Cleanupable implements OnDestroy {
  public videoStreamingInstance$ = new BehaviorSubject<IStreamingLib>(null);
  public get videoStreamingInstance() {
    return this.videoStreamingInstance$.value;
  }

  public abstract remoteParticipants$: Observable<IStreamingParticipant[]>;

  twilioParticipantsSubscription: Subscription;
  currentVideoStream: VideoStream;
  currentAudioStream: AudioStream;
  currentToken: TwilioTokenResponse;
  connectionStatus = false;

  constructor(
    protected audioSource: AudioStreamService,
    protected videoSource: VideoStreamService,
    protected twilioProvider: IStreamingLibService,
    protected sessionInfo: SessionBaseService<SubjectSessionClass>,
    protected sessionConfig: SessionConfigService,
    protected networkQualityService: NetworkQualityService
  ) {
    super();
    this.subscriptions.push(
      this.audioSource.audio$.pipe(
        finalize(() => this.audioSource.closeStream())
      ).subscribe(async (audioStream) => {
        this.currentAudioStream = audioStream;
        await this.handleAudioSourceChange();
      }),
      this.videoSource.video$.subscribe((videoStream) => {
        this.currentVideoStream = videoStream;
        this.handleVideoSourceChange();
      }),
      this.videoSource.currentConstraints$.subscribe((newConstraints) => {
        this.handleVideoConstraintsChange(newConstraints);
      })
    );
  }

  abstract getTokenInfo(sessionId: number): Promise<TwilioTokenResponse>;

  async joinVideo() {
    await this.sessionInfo.waitForInitialize();
    await this.updateToken();

    if (this.videoStreamingInstance) {
      this.videoStreamingInstance.disconnect();
      this.sessionInfo.removeAllParticipants();
    }
    if (this.twilioParticipantsSubscription) {
      this.twilioParticipantsSubscription.unsubscribe();
    }
    this.videoStreamingInstance$.next(
      await this.twilioProvider.getStreaming(
        this.currentToken.token,
        this.sessionInfo.session.session_id.toString()
      )
    );
    await this.handleAudioSourceChange(true);
    this.handleVideoSourceChange();
    this.twilioParticipantsSubscription = combineLatest([
      this.videoStreamingInstance.myParticipant$,
      this.videoStreamingInstance.remoteParticipants$,
    ]).subscribe(([myParticipant, remoteParticipants]) => {
      this.sessionInfo.setNewParticipants(
        myParticipant,
        remoteParticipants,
        this.videoStreamingInstance
      );
    });
    this.subscriptions.push(this.twilioParticipantsSubscription);
    //network quality subcriptions
    this.subscriptions.push(
      this.videoStreamingInstance.networkQuality$.subscribe((networklevel) => {
        this.networkQualityService.setNetworkQuality(networklevel);
      })
    );
    //twilio connection status
    this.subscriptions.push(
      this.videoStreamingInstance.connectionStatus$.subscribe((status) => {
        this.connectionStatus = status;
      })
    );
  }

  disconnectVideo() {
    if (this.videoStreamingInstance) {
      this.videoStreamingInstance.disconnect();
    }
  }

  async handleAudioSourceChange(startIfInactive: boolean = false) {
    if (this.videoStreamingInstance) {
      if (this.currentAudioStream?.stream?.active) {
        await this.videoStreamingInstance.broadcastAudio(
          this.currentAudioStream.track
        );
      } else if (
        startIfInactive &&
        this.currentAudioStream &&
        !this.currentAudioStream.error
      ) {
        this.sessionConfig.setNewAudioDevice(this.currentAudioStream.device);
      }
    }
  }

  private handleVideoConstraintsChange(newConstraints: VideoConstraints) {
    if (
      newConstraints &&
      this.currentVideoStream?.stream?.active &&
      this.videoStreamingInstance
    ) {
      this.videoStreamingInstance.changeVideoParameters(newConstraints);
    }
  }

  private async handleVideoSourceChange() {
    if (this.videoStreamingInstance) {
      if (this.currentVideoStream?.stream?.active) {
        this.videoStreamingInstance.broadcastVideo(
          this.currentVideoStream.track
        );
      }
    }
  }

  private async updateToken() {
    try {
      this.currentToken = await this.getTokenInfo(
        this.sessionInfo.session.session_id
      );
    } catch (error) {
      console.error('Can not join the room', error);
      if (
        error.message !== 'You are already in this room using another browser'
      ) {
        throw error;
      }
    }
  }

  ngOnDestroy() {
    this.videoStreamingInstance$.pipe(
      take(1)
    ).subscribe(instance => {
      console.log('disconnecting');
      if (instance) instance.disconnect();
    })
  }
}
