import { Device, UniPi } from '../models/device';
import * as mqtt from 'mqtt';
import { Injectable } from '@angular/core';
import {Buffer} from 'buffer';
import { EnvConfigurationService } from './env-config.service';
import { LocalStorageUtils } from './localstorage.utils';

export enum MqttEvent {
    item = 1,
    scrap = 2,
    productsChanged = 20,
    stopsChanged = 21,
    productionScheduleChanged = 24,
    deviceStateChanged = 40,
    updateDevice = 99
}

export interface MqttMessage {
    topic: string;
    payload: Buffer;
}

export enum DataType {
  units = 1,
  dropped = 0,
  connection = 2
}

@Injectable()
export class MqttService {

    connected = false;
    client: any = null;
    subscriptions: string[] = [];
    device: Device = null;
    unipi: UniPi = null;
    server = {
        label: 'Production',
        httpUrl: this._envSetting.settings.apiBase,
        mqtt: {
            host: this._envSetting.settings.mqttHost,
            port: this._envSetting.settings.mqttPort,
            protocol: this._envSetting.settings.mqttProtocol,
            path: this._envSetting.settings.mqttPath
        }
    };

    constructor(
        private _envSetting: EnvConfigurationService,
        private _localStorageUtils: LocalStorageUtils) { }

    /**
     * Publish a message to the current connected device
     *
     * @param type
     * @param {string} message
     * @returns {Promise<any>}
     *
     * @memberOf MqttService
     */
    async publishDeviceAPIandDisconnect(type: number, message?: Buffer): Promise<any> {
        if (!this.device) {
            throw new Error('Can\'t pub to device: device not connected');
        }

        // build the payload
        const typeBuf = Buffer.alloc(1);
        typeBuf.writeUIntBE(type, 0, 1);

        const inputBuffers = [typeBuf];
        if (message) {
            inputBuffers.push(message);
        }

        await this.publishBuffer(`10/${this.device.id}`, Buffer.concat(inputBuffers));
        await this.disconnect();
    }

    /**
     * Publish
     *
     * @param {string} topic
     * @param {Buffer} message
     * @returns {Promise<any>}
     *
     * @memberOf MqttService
     */
    publishBuffer(topic: string, message: Buffer): Promise<any> {
        if (this.connected) {
            console.log('sending message...', topic, message);
            return new Promise((resolve, reject) => {
                this.client.publish(topic, message, (err) => {
                    if (err) {
                        console.log('error', err);
                        reject(err);
                    } else {
                        console.log('published');
                        resolve(null);
                    }
                });
            });
        }
    }

    /**
     * Subscribe to a mqtt channel
     *
     * @returns {Promise<any>}
     *
     * @memberOf MqttService
     */
    subscribe(topic: string): Promise<any> {
        if (!this.connected) {
            return Promise.reject(new Error('mqtt Client not connected'));
        }
        if (this.subscriptions.indexOf(topic) === -1) {
            this.client.subscribe(topic);
            this.subscriptions.push(topic);
        }
        return Promise.resolve();
    }
    
    async subscribeAll(topics: string[]): Promise<any> {
        if (!this.connected) {
            return Promise.reject(new Error('mqtt Client not connected'));
        }
        await Promise.all(topics.map(topic => { 
            this.client.subscribe(topic);
            
        }));
        this.subscriptions = this.subscriptions || [];
        this.subscriptions = this.subscriptions.concat(topics);
        return Promise.resolve();
    }

    /**
     * Subscribe to a mqtt channel
     *
     * @returns {Promise<any>}
     *
     * @memberOf MqttService
     */
    unsubscribe(topic: string): Promise<any> {
        if (!this.connected) {
            return Promise.reject(new Error('mqtt Client not connected'));
        }
        if (this.subscriptions.indexOf(topic) >= 0) {
            this.client.unsubscribe(topic);
            this.subscriptions.splice(this.subscriptions.indexOf(topic), 1);
        }
        return Promise.resolve();
    }

    /**
     * Disconnect from mqtt
     *
     * @returns {Promise<any>}
     *
     * @memberOf MqttService
     */
    disconnect(): Promise<any> {
        if (!this.connected) {
            return Promise.resolve();
        } else {
            console.log('Disconnecting from mqtt');
            return new Promise(resolve => {
                this.client.end(false, () => {
                    this.connected = false;
                    this.client = null;
                    this.subscriptions = [];
                    this.device = null;
                    console.log('Disconnected from mqtt');
                    resolve(null);
                });
            });
        }
    }

    /**
     * Connect to mqtt broker. Reset the connection if it's already connected
     *
     * @returns {Promise<any>}
     *
     * @memberOf MqttService
     */
    reconnect(username: string, password: string): Promise<any> {
        return this.disconnect()
            .then(() => this.connect(username, password));
    }

    isConnected() {
        return this.connected;
    }

    async connectWebUser(): Promise<any> {
        if (this.isConnected()) {
            return true;
        }
        if (localStorage.getItem("token")) {
            try {
                await this.connect(this._localStorageUtils.readFromLocalStorage('user'), this._localStorageUtils.readFromLocalStorage('token'));
                return true;
            } catch (err) {
                console.error(err);
                return false;
            }
        } 
        
        return false;
    }

    /**
     * Connect to mqtt broker
     *
     * @returns {Promise<any>}
     *
     * @memberOf MqttService
     */
    connect(username: string, password: string): Promise<any> {
        if (this.connected) {
            return Promise.resolve(this.client);
        }

        console.log(`Connecting to mqtt`);
        return new Promise((resolve, reject) => {
            const host = this.server.mqtt.host;
            const port = this.server.mqtt.port;
            const protocol = this.server.mqtt.protocol;
            const path = this.server.mqtt.path;
            let connectionUrl = `${protocol}://${host}`;

            if (port) {
                connectionUrl = `${connectionUrl}:${port}`;   
            }
            if (path) {
                connectionUrl = `${connectionUrl}/${path}`;
            }

            let client = (<any>mqtt).connect(connectionUrl, {
                rejectUnauthorized: false,
                username: username,
                password: password
            });

            client.on('connect', async () => {
                console.log('Connected to mqtt');
                this.connected = true;
                this.client = client;
                this.device = null;
                resolve(client);
            });
            client.on('error', err => {
                console.error('Can\'t connect to mqtt: ' + err);
                client.end();
                reject(err);
            });
            client['stream'].on('error', (err) => {
                // fires when connection can't be established
                // but not when an established connection is lost
                console.error('Can\'t connect to mqtt: ' + err);
                client.end();
                reject(err);
            });
        });
    }

    /**
   * compute bitwise not in 32bit
   *
   * @param {number} n
   * @returns {number}
   */
    not32(n: number): number {
        return (~n & Math.pow(2, 32) - 1) >>> 0;
    }

    /**
     *
     * Check if the number has a sign in 32 bit representation and convert to a valid js signed number
     *
     * @param {number} n
     * @returns {number}
     *
     * @memberOf MqttService
     */
    checkSign32(n: number): number {
        if (n > 2147483647) {
            return (- this.not32(n) - 1);
        } else {
            return n;
        }
    }

    /**
     * Notify an event with mqtt
     *
     * @param {UniPi} unipi
     * @param {Device} device
     * @param {MqttEvent} mqttEvent
     * @param {Buffer} [buf]
     */
    private async notify(topic: string, payload: Buffer) {
        // notify through mqtt
        await this.publishBuffer(topic, payload);
        await this.disconnect();
    }

    /**
     * Notify a general techmass event with mqtt
     *
     * @param {UniPi} unipi
     * @param {Device} device
     * @param {MqttEvent} MqttEvent
     */
    async notifyEvent(unipi: UniPi, device: Device, mqttEvent: MqttEvent) {
        const message = this.buildNotifyMessage(unipi.serial, mqttEvent);
        await this.notify(message.topic, message.payload);
    }

    /**
     * Build topic+payload of the notification required
     *
     * @param {number} deviceId
     * @param {MqttEvent} mqttEvent
     * @returns {MqttMessage}
     * @memberof MqttNotifyService
     */
    public buildNotifyMessage(deviceId: number | string, mqttEvent: MqttEvent): MqttMessage {
        const buf = Buffer.alloc(1);
        buf.writeUIntBE(mqttEvent, 0, 1);

        return {
            topic: '02/' + deviceId,
            payload: buf
        };
    }

}
