import { Injectable } from '@angular/core';
import {IMqttMessage, IMqttServiceOptions, IOnMessageEvent, MqttConnectionState, MqttService} from 'ngx-mqtt';
import {Subject, Subscription} from 'rxjs';
import {filter, map} from 'rxjs/operators';
import {environment} from "../../environments/environment";

export enum MQTTEvents{
  'onConnectionStateChange',
  'onNewMessage'
}

export class MQTTEmitEvent {
  name: MQTTEvents;
  value: any;
}

export interface IMQTTCommand {
  topic: string;
  message: string;
}

@Injectable({
  providedIn: 'root'
})
export class MqttWrapperService {
  private subject$ = new Subject()
  private subscriptions: Subscription[] = [];
  private topic: string = '#';

  isConnected: boolean = false;

  constructor(private _mqttService: MqttService) {
    this.mqttSubscribeEvents();
  }

  private mqttSubscribeEvents() {
    this.subscriptions.push(this._mqttService.onConnect.subscribe(() => this.onConnectionStateChange()));
    this.subscriptions.push(this._mqttService.onReconnect.subscribe(() => this.onConnectionStateChange()));
    this.subscriptions.push(this._mqttService.onClose.subscribe(() => this.onConnectionStateChange()));
    this.subscriptions.push(this._mqttService.onOffline.subscribe(() => this.onConnectionStateChange()));
    this.subscriptions.push(this._mqttService.onMessage.subscribe((message: IOnMessageEvent) => this.onNewMessage(message)));

    this.onConnectionStateChange();
  }

  private mqttUnsubscribeEvents() {
    this.subscriptions.forEach(subscription => subscription.unsubscribe());
  }

  private emit(event: MQTTEmitEvent) {
    this.subject$.next(event);
  }

  connect(ip: string) {
    if (ip != '') {
      this._mqttService.disconnect(true);

      let opt: IMqttServiceOptions ={
        hostname: ip,
        port: 8083,
        path: '/mqtt',
        protocol: 'wss',
      };

      this._mqttService.connect(opt);
    }
  }

  on(event: MQTTEvents, action: any): Subscription {
    return this.subject$.pipe(
        filter((e: MQTTEmitEvent) => e.name == event),
        map((e: MQTTEmitEvent) => e.value)).subscribe(action);
  }

  onConnectionStateChange() {
    this._mqttService.state.subscribe(v => {
      this.isConnected = v == MqttConnectionState.CONNECTED;

      this.emit({name: MQTTEvents.onConnectionStateChange, value: this.isConnected});

      if (this.isConnected) {
        this.subscribeNewTopic();
      }

      console.log('mqtt is connected = ', this.isConnected);
    })
  }

  onNewMessage(message: IOnMessageEvent) {
    this.emit({name: MQTTEvents.onNewMessage, value: message});
  }

  subscribeNewTopic(): void {
    this.subscriptions.push(this._mqttService.observe(this.topic).subscribe((message: IMqttMessage) => {}));

    console.log('subscribed to topic: ' + this.topic)
  }

  sendPacket(packet: IMQTTCommand) {
    this.sendMessage(packet.topic, packet.message);
  }

  sendMessage(topic: string, msg: string) {
    this._mqttService.unsafePublish(topic, msg, { qos: 0, retain: false })

    console.log('Send Message', topic, msg);
  }
}
