import { BehaviorSubject, Observable, Subject, filter } from 'rxjs';
import { BroadcastMessage } from './broadcast-message';
import { RxEvent } from '@shared/channel/rx-event';
import { NgZone, inject } from '@angular/core';
export class BroadcastService {
  private broadcastSubject = new BehaviorSubject<BroadcastMessage>({});
  private onDestroy = new Subject<void>();
  private static MESSAGE_TYPE = 'DESKTOP';
  static INFINITY_READ_STREAM = true;
  constructor(
    broadcastChannelName: string,
    private ngZone: NgZone
  ) {
  }
  private rxEvent: RxEvent = inject(RxEvent);
  publish(message: BroadcastMessage): void {
    this.rxEvent.emit(BroadcastService.MESSAGE_TYPE, message);
  }

  messagesOfType(type: string): Observable<BroadcastMessage> {
    this.rxEvent.read(BroadcastService.MESSAGE_TYPE).then((read) => {
      this.readStream(read.stream);
    });

    return this.broadcastSubject.asObservable().pipe(
      filter((message) => message.type === type)
    );
  }

  private async readStream(stream: ReadableStream) {
    const reader = stream.getReader();
    let ret;
    do {
      ret = await reader.read();
      if (!ret.done) {
        this.broadcastSubject.next(ret.value.data);
      }
    } while (BroadcastService.INFINITY_READ_STREAM);
  }
}
