import _ from 'lodash';
import { EventEmitter } from 'events';
import MQTT from 'mqtt';

import ErrorReporter from '../error_reporter';
import qconsole from 'scripts/lib/qconsole';
import { isDevelopment } from 'scripts/application/lib/environment';

const CAN_LOG_PAYLOADS = isDevelopment() || /\.gladly\.qa/.test(window.location.host);
const DEFAULT_KEEPALIVE = 10; // seconds
export const MAX_JITTER = 30000;
const JITTER = _.random(0, MAX_JITTER); // random % of jitter for reconnects, up to 30s

export default class MqttClient {
  /**
   * Creates an instance of MqttBackend with the specified MQTT.js client factory.
   *
   * @param url - the url for the mqtt endpoint
   */
  constructor({ url, eventRecorder, clientId }) {
    this._url = url;
    this._mqttClient = null;
    this._topicSubscriptions = {};
    this._emitter = new EventEmitter().setMaxListeners(0);
    this._clientId = clientId;
    this.eventRecorder = eventRecorder;
    this._isServerV2 = false;
    this._keepAlive = DEFAULT_KEEPALIVE;

    _.bindAll(this, ['onMessage']);
  }

  // connection uses a cookie to authenticate
  connect(username, orgId) {
    let url = this._isServerV2 ? this._url : `${this._url}/${orgId}`;

    this._mqttClient = MQTT.connect(url, {
      username,
      clientId: this._clientId,
      protocolVersion: 4,
      resubscribe: false,
      transformWsUrl: (wsUrl, options, client) => {
        client.options.keepalive = this._keepAlive;
        client.options.clean = !this._isServerV2;
        wsUrl = this._isServerV2 ? this._url : `${this._url}/${orgId}`;
        qconsole.log(`MQTT - connecting with keepalive ${client.options.keepalive}, clean ${client.options.clean}`);
        return wsUrl;
      },
    });

    this._registerEventHandlers();
  }

  configureMqttServerV2(isServerV2) {
    if (this._isServerV2 === isServerV2) {
      return;
    }
    setTimeout(() => {
      this._isServerV2 = isServerV2;
      this._mqttClient.end();
      this._mqttClient.reconnect();
    }, 1500);
  }

  multiplyDefaultMqttKeepAlive(multiplier) {
    this._keepAlive = DEFAULT_KEEPALIVE * multiplier;
  }

  reset() {
    this.emit('reset');
    if (this._mqttClient) {
      this.unsubscribeAll();
      this._unregisterEventHandlers();
      this._mqttClient.end(() => qconsole.log('MQTT - client shut down'));
      this._mqttClient = null;
    }
  }

  close() {
    this.reset();
    this._emitter.removeAllListeners();
  }

  /**
   * Registers event handlers.
   * @see EventEmitter
   *
   * @param event - one of
   *                * 'disconnect': invoked with the connection to the broker is lost
   *                * 'connect': invoked with the connection with the broker is established
   *                * 'error': invoked when an unexpected error occurs, such as failure to subscribe,
   *                           or a fatal connection error
   *                * 'close': invoked when the connection is explicitly closed by the client
   * @param listener a handler function for the above event
   */
  on(event, listener) {
    this._emitter.on(event, listener);
  }

  off(event, listener) {
    this._emitter.removeListener(event, listener);
  }

  emit(event, ...args) {
    this._emitter.emit(event, ...args);
  }

  /**
   * Remove a listener from the listener array for the specified event.
   * @param event the name of the event
   * @param listener the listener function to remove
   */
  removeListener(event, listener) {
    this._emitter.removeListener(event, listener);
  }

  _registerEventHandlers() {
    this._mqttClient.on('offline', () => {
      this._recordMqttEvent(null, 'MQTT', 'disconnected from broker');
      qconsole.log('MQTT - disconnected from broker');
      clearTimeout(this.reconnectTimeout);

      if (!this._isOffline) {
        // Don't emit when we know we're already offline
        this.emit('disconnect');
      }

      this._isOffline = true;
    });

    // This event typically will happen together with "offline"
    this._mqttClient.on('close', () => {
      if (!this._isOffline) {
        this._isOffline = true;
        this.emit('disconnect');
      }
    });

    this._mqttClient.on('connect', () => {
      this._recordMqttEvent(null, 'MQTT', 'connected to broker');
      qconsole.log('MQTT - connected to broker');
      this.emit('connect');

      if (this._isOffline) {
        // Only emit on reconnect
        this._isOffline = false;
        this._resubscribe();

        this.reconnectTimeout = setTimeout(() => this.emit('reconnect'), JITTER);
      }
    });

    this._mqttClient.on('error', err => {
      if (err.message && err.message === 'Connection refused: Not authorized') {
        qconsole.log('MQTT - unauthorized connection to mqtt');
        this.emit('unauthorized');
        return;
      }

      ErrorReporter.reportError(err, { message: 'MQTT - unexpected error occurred' });
      this.emit('error', err);
    });

    this._mqttClient.on('message', this.onMessage);
  }

  get isOffline() {
    return this._isOffline;
  }

  onMessage(topic, message) {
    let json = JSON.parse(message);
    if (CAN_LOG_PAYLOADS) {
      qconsole.log(`MQTT - received message with ${topic}`, json);
    } else {
      qconsole.log(`MQTT - received message with ${topic}`);
    }
    this._eachTopicSubscription(t => t.message(json, topic));

    this._recordMqttEvent(json.correlationId, 'MSG', topic);
  }

  publish(topic, message, attempt = 1) {
    if (CAN_LOG_PAYLOADS) {
      qconsole.log(`MQTT - publishing message with ${topic}`, message);
    } else {
      qconsole.log(`MQTT - publishing message with ${topic}`);
    }

    this._recordMqttEvent(message.correlationId, 'PUB', topic);
    this._mqttClient.publish(topic, JSON.stringify(message), { qos: 1 }, err => {
      if (err && err.toString() === 'Error: client disconnecting' && attempt < 11) {
        setTimeout(() => this.publish(topic, message, attempt + 1), attempt * 100);
      } else if (err) {
        qconsole.error(`MQTT - publishing to ${topic} failed`, err);
      } else {
        qconsole.log(`MQTT - publishing to ${topic} acknowledged`);
      }
    });
  }

  /**
   * Subscribes the provided `onReceive` handler (a subscriber) to the topic.
   *
   *
   * @param topic - the topic to subscribe to, e.g. "server/v1/customer/123/profile"
   * @param onReceive - a function to handle messages for the topic.
   *                    It will be called every time a new message arrives.
   * @param cb - an optional callback to be invoked when subscription is acknowledged
   */
  subscribe(topic, onReceive, cb) {
    if (!_.isFunction(onReceive)) {
      throw new Error('onReceive must be a function');
    }

    if (!_.isUndefined(cb) && !_.isFunction(cb)) {
      throw new Error('cb must be a function');
    }

    this._getTopicSubscription(topic).add(onReceive, cb);
  }

  _getTopicSubscription(topic) {
    if (this._topicSubscriptions[topic]) {
      return this._topicSubscriptions[topic];
    }
    let topicSubscription = (this._topicSubscriptions[topic] = new TopicSubscription(topic));
    topicSubscription.onEmpty(() => this._rmTopicSubscription(topic));
    this._subscribe(topic);
    return topicSubscription;
  }

  _subscribe(topic, backoffDelay = 30000) {
    let topicSubscription = this._topicSubscriptions[topic];
    if (!topicSubscription) {
      return;
    }

    // Don't try to subscribe after `this.reset()` has happened. This can
    // happen if the subscribe retry timeout occurs before the mqtt
    // disconnect event is fired DEV-13208.
    // Also, do not try to subscribe when the client is offline (may happen during frequent
    // disconnect-reconnects)
    if (!this._mqttClient || this._isOffline) {
      if (this._isOffline) {
        qconsole.log(`MQTT - attempt to subscribe to ${topic} while offline/disconnected`);
      }
      return;
    }

    let timeoutDelay = Math.floor(backoffDelay + Math.random() * backoffDelay);
    let timeout = setTimeout(() => {
      let errMsg = 'MQTT - subscribe timeout';
      ErrorReporter.reportError(new Error(errMsg), {
        tags: { mqttTopic: topic, backoffDelay, timeoutDelay },
      });
      this._subscribe(topic, Math.min(backoffDelay * 2, 60000));
    }, timeoutDelay);

    let cancelTimeout = () => clearTimeout(timeout);
    this.on('disconnect', cancelTimeout);

    this._recordMqttEvent(null, 'SUB', topic);
    this._mqttClient.subscribe(topic, { qos: 1 }, (err, granted) => {
      this.off('disconnect', cancelTimeout);
      cancelTimeout();

      if (err) {
        this._recordMqttEvent(null, 'SUBERR', topic);
        ErrorReporter.reportError(err, { message: 'MQTT - could not subscribe', tags: { mqttTopic: topic } });
        return topicSubscription.fail(err);
      }

      if (!granted || !granted[0]) {
        this._recordMqttEvent(null, 'SUBERR', topic);
        const errMsg = 'MQTT - subscription not granted';
        ErrorReporter.reportError(new Error(errMsg), { message: errMsg, tags: { granted, mqttTopic: topic } });
        return topicSubscription.fail(err);
      }

      if (granted[0].qos === 128) {
        this._recordMqttEvent(null, 'SUBERR', topic);
        // qos is a misnomer from the mqtt library. This is the return code, which generally matches subscribe qos
        // but in the case of unauthorized, return code is 128
        qconsole.error(`MQTT - unauthenticated or unauthorized subscription to ${topic}`);
        this.emit('unauthorized');
        return topicSubscription.fail(new Error('Not authorized'));
      }

      this._recordMqttEvent(null, 'SUBACK', topic);
      qconsole.log(`MQTT - subscription to ${granted[0].topic} granted with QoS ${granted[0].qos}`);

      return topicSubscription.ready();
    });

    return topicSubscription;
  }

  /**
   * Removes the topic's `onReceive` handler (a subscriber).
   *
   * @param topic - the topic to remove the handler from.
   * @param onReceive - the handler to remove. It must be the exact same handler as was passed in `subscribe`,
   *                    i.e. it mas the `===` identity test
   */
  unsubscribe(topic, onReceive) {
    this._getTopicSubscription(topic).rm(onReceive);
  }

  unsubscribeAll() {
    this._eachTopicSubscription(t => t.rmAll());
  }

  _resubscribe() {
    this._eachTopicSubscription((subscription, topic) => this._subscribe(topic));
  }

  _eachTopicSubscription(callback) {
    _.forEach(this._topicSubscriptions, (subscription, pattern) => {
      if (subscription) {
        // guard against subscription being removed by a previous invocation
        // of the callback
        // DEV-9611
        callback(subscription, pattern);
      }
    });
  }

  _rmTopicSubscription(topic) {
    delete this._topicSubscriptions[topic];
    this._recordMqttEvent(null, 'UNSUB', topic);
    this._mqttClient.unsubscribe(topic, () => qconsole.log(`MQTT - unsubscription from ${topic} acknowledged`));
  }

  _recordMqttEvent(correlationId, type, topic) {
    this.eventRecorder && this.eventRecorder.recordEvent(correlationId, `${type} ${topic}`);
  }

  _unregisterEventHandlers() {
    if (!this._mqttClient) return;

    this._mqttClient.off('message', this.onMessage);
  }
}

export class TopicSubscription extends EventEmitter {
  constructor(pattern) {
    super();
    this.pattern = pattern;
    this.regex = createTopicRegex(pattern);
    this.isReady = false;
  }

  add(onMessage, onReady) {
    if (this.listeners('message').includes(onMessage)) {
      qconsole.log(`MQTT - ${this.pattern} already has this subscriber`);
    } else {
      this.on('message', onMessage);
    }

    if (onReady) {
      this._onReady(onReady);
    }
  }

  rm(onMessage) {
    this.removeListener('message', onMessage);
    if (this.listenerCount('message') === 0) {
      this._empty();
    }
  }

  rmAll() {
    this.removeAllListeners('message');
    this._empty();
  }

  onEmpty(onEmpty) {
    this.once('empty', onEmpty);
  }

  ready() {
    this.isReady = true;
    this.emit('ready', null);
  }

  fail(err) {
    this.emit('ready', err);
  }

  message(message, topic) {
    if (!this.regex.test(topic)) {
      return;
    }

    // not using `emit` b/c of custom error handling
    this.listeners('message').forEach(listener => {
      try {
        listener(message, topic);
      } catch (err) {
        let errParams = {
          message: 'MQTT - could not process incoming message',
          tags: { mqttTopic: topic },
          extra: {
            topic,
            messageLength: JSON.stringify(message).length,
          },
        };

        // NOTE: only log payload on internal domains to protect client security
        if (/\.gladly\.qa/.test(window.location.host)) {
          errParams.extra.message = message;
        }
        ErrorReporter.reportError(err, errParams);
      }
    });
  }

  _onReady(onReady) {
    if (this.isReady) {
      setImmediate(onReady);
    } else {
      this.once('ready', onReady);
    }
  }

  _empty() {
    this.emit('empty');
    this.removeAllListeners();
  }
}

/**
 * Replaces wildcards, + and #, with regular expressions
 * @param topic with wildcards
 * @returns {RegExp}
 */
function createTopicRegex(topic) {
  return new RegExp(
    `^${topic
      .replace(/[.*?^${}()|[\]\\]/g, '\\$&') // escape all RegExp tokens except for `+`
      .replace(/\+/g, '[^/]+')
      .replace('#', '.*')}$`
  );
}
