import { Injectable } from '@angular/core';
import { Store } from '@ngrx/store';
import { QueueingSubject } from '@dry7/queueing-subject';
import { Subscription } from 'rxjs';
import websocketConnect from 'rxjs-websockets';
import { distinctUntilChanged, map, skip } from 'rxjs/operators';
import {
  ReceiveMessage,
  SendMessage,
  SocketConnect,
  SocketConnectSuccess,
  SocketDisconnect
} from '../../../../store/actions/socket.action';
import { AppState } from '../../../../store/reducers';
import { AuthenticationService } from '../authentication/authentication.service';

@Injectable({
  providedIn: 'root'
})
export class SocketService {
  private inputSubject = new QueueingSubject<string>();
  private connectionStatusSub: Subscription;
  private messagesSub: Subscription;

  constructor(
    private auth: AuthenticationService,
    private store: Store<AppState>
  ) {
  }

  init() {
    this.auth.authStateChange.pipe(distinctUntilChanged()).subscribe(flag => {
      if (flag) {
        this.store.dispatch(new SocketConnect());
      } else {
        this.store.dispatch(new SocketDisconnect());
      }
    });
  }

  clear() {
    if (this.connectionStatusSub) {
      this.connectionStatusSub.unsubscribe();
    }
    if (this.messagesSub) {
      this.messagesSub.unsubscribe();
    }
  }

  connect() {
    this.clear();
    const {messages, connectionStatus} = websocketConnect(
      this.getWebSocketUrl(),
      this.inputSubject
    );
    this.connectionStatusSub = connectionStatus
      .pipe(skip(1))
      .subscribe(status => {
        if (status) {
          this.store.dispatch(
            new SendMessage({
              type: 'login',
              token: this.auth.getPlainAccessToken()
            })
          );
        }
        this.store.dispatch(new SocketConnectSuccess());
      });
    this.messagesSub = messages
      .pipe(
        map((text: string) => {
          return JSON.parse(text);
        })
      )
      .subscribe(
        message => this.store.dispatch(new ReceiveMessage(message)),
        error => {
          this.store.dispatch(new SocketDisconnect());
        }
      );
  }

  disconnect() {
    this.clear();
  }

  sendMessage(message: any) {
    this.inputSubject.next(JSON.stringify(message));
  }

  private getWebSocketUrl(): string {
    return location.origin.replace(/^http/, 'ws') + `/websocket/`;
  }
}
