import { Mutex } from "async-mutex";
import _isEmpty from "lodash/isEmpty";
import _unset from "lodash/unset";
import { ToadScheduler, SimpleIntervalJob, AsyncTask } from "toad-scheduler";

import "fastestsmallesttextencoderdecoder/EncoderDecoderTogether.min";
import logger from "../logger";

const INTERVAL_S = 10;

// Ignore MQTT connectivity and force pull data every Xs
const FORCE_INTERVAL_S = 60;

const WECHAT_DECODER_TESTER_TOPIC = `wechatDecoderTester/${Math.random()}`;

export type Cookie = string

interface Subscription<T> {
  identifier: string;

  listeners: {
    [cookie: Cookie]: ((newData: T) => Promise<void>) | null;
  };

  mqttTopic?: string;
  mqttResolver?: (payload: any) => Promise<T>;
  httpRequest?: () => Promise<T>;

  // Once set, it will keep being executed even the original setter is removed
  storePersister?: (newData: T) => Promise<void>;

  uniqueIndex: number;
}

const generateMqttIdentifier = (identifier: string): string => `${identifier}-mqtt`;

export const createSubscriptionManager = (
  { mqttInstance }: { mqttInstance: any },
): {
  subscribe: (
    identifier: string,
    callback?: (newData: any) => Promise<void>,
    mqttTopic?: string,
    mqttResolver?: (payload: any) => Promise<any>,
    httpRequest?: () => Promise<any>,
    storePersister?: (newData: any) => Promise<void> | void,
    interval?: number,
  ) => Promise<Cookie>;
  unsubscribe: (
    identifier: string,
    cookie: Cookie,
  ) => Promise<void>;
} => {
  const subscriptions: {[identifier: string]: Subscription<any>} = {};

  const subscriptionsLock = new Mutex();

  const scheduler = new ToadScheduler();

  let wechatDecoderTesterResult = { verified: false };

  const pushData = async (identifier: string, data: any): Promise<void> => {
    const { listeners, storePersister } = subscriptions[ identifier ];
    await Promise.all([
      ...Object.values(listeners),
      storePersister,
    ].map((cb) => cb?.(data)));
  };

  const refreshData = async (identifier: string): Promise<void> => {
    const { httpRequest } = subscriptions[ identifier ];
    if (!httpRequest) return;
    logger.info(`Polling data for ${identifier}`);
    const data = await httpRequest?.();
    await pushData(identifier, data);
  };

  const refreshDataMqtt = async (identifier: string): Promise<void> => {
    const { mqttTopic } = subscriptions[ identifier ];
    if (!!mqttTopic && mqttInstance.connected && wechatDecoderTesterResult.verified) {
      logger.info("MQTT connected and in good quality, skip polling");
      return;
    }
    await refreshData(identifier);
  };

  mqttInstance.on("message", async (topic: string, message: ArrayBuffer) => {
    try {
      const decoded = new TextDecoder("utf-8").decode(message);
      const payload = JSON.parse(decoded);
      logger.info("Receive mqtt message", topic, payload);
      const targets = Object.values(subscriptions).filter((subscription) => subscription.mqttTopic === topic);
      await Promise.all(targets.map(async (target) => {
        const resolver = target.mqttResolver;
        const processedPayload = resolver ? resolver(payload) : payload;
        await pushData(target.identifier, processedPayload);
      }));
      if (topic === WECHAT_DECODER_TESTER_TOPIC) {
        wechatDecoderTesterResult = payload;
      }
    } catch (e) {
      console.error("Unable to decode mqtt message", e);
    // TODO: Fallback to polling
    }
  });

  const subscribe = async (
    identifier: string,
    callback?: (newData: any) => Promise<void>,
    mqttTopic?: string,
    mqttResolver?: (payload: any) => Promise<any>,
    httpRequest?: () => Promise<any>,
    storePersister?: (newData: any) => Promise<void>,
    interval: number = INTERVAL_S,
  ): Promise<Cookie> => subscriptionsLock.runExclusive(async () => {
    const cookie = `${identifier}-${String(subscriptions[ identifier ]?.uniqueIndex ?? 0)}`;
    logger.info(`Subscribe ${mqttTopic} ${identifier} ${cookie}`);
    if (subscriptions[ identifier ] != null) {
      subscriptions[ identifier ].listeners[ cookie ] = callback ?? null;
      subscriptions[ identifier ].storePersister = subscriptions[ identifier ].storePersister ?? storePersister;
      subscriptions[ identifier ].uniqueIndex += 1;

      return cookie;
    }

    const forceTask = new AsyncTask(
      identifier,
      () => refreshData(identifier),
      (error: Error) => {
        logger.error(error);
      },
    );
    const forceJob = new SimpleIntervalJob(
      { seconds: FORCE_INTERVAL_S },
      forceTask,
      identifier,
    );
    scheduler.addSimpleIntervalJob(forceJob);

    const mqttIdentifier = generateMqttIdentifier(identifier);
    const mqttTask = new AsyncTask(
      mqttIdentifier,
      () => refreshDataMqtt(identifier),
      (error: Error) => {
        logger.error(error);
      },
    );
    const mqttJob = new SimpleIntervalJob(
      { seconds: interval },
      mqttTask,
      mqttIdentifier,
    );
    scheduler.addSimpleIntervalJob(mqttJob);

    if (mqttTopic != null) {
      mqttInstance.subscribe(
        mqttTopic,
      // { properties: { userProperties: { identifier } } },
      );
    }

    subscriptions[ identifier ] = {
      identifier,
      listeners: { [ cookie ]: callback ?? null },
      mqttTopic,
      mqttResolver,
      httpRequest,
      storePersister,
      uniqueIndex: 1,
    };

    // Call http request immediately once
    const data = await httpRequest?.();
    await pushData(identifier, data);

    return cookie;
  });

  const unsubscribe = async (
    identifier: string,
    cookie: Cookie,
  ): Promise<void> => subscriptionsLock.runExclusive(async () => {
    logger.info(`Unsubscribe ${identifier} ${cookie}`);

    if (!subscriptions?.[identifier]?.listeners || !(cookie in subscriptions?.[identifier]?.listeners)) {
      logger.error(`Invalid cookie ${cookie}. Same cookie has been unregistered.`);
      return;
    }

    _unset(subscriptions, [ identifier, "listeners", cookie ]);
    if (_isEmpty(subscriptions?.[identifier]?.listeners)) {
      const { mqttTopic } = subscriptions[ identifier ];

      scheduler.removeById(identifier);
      scheduler.removeById(generateMqttIdentifier(identifier));

      if (mqttTopic != null) {
        mqttInstance.unsubscribe(mqttTopic);
      }

      _unset(subscriptions, [ identifier ]);
    }
  });

  // Send and receive a message in tester topic to ensure the wechat decoder quality
  mqttInstance.subscribe(WECHAT_DECODER_TESTER_TOPIC);
  mqttInstance.publish(WECHAT_DECODER_TESTER_TOPIC, `{"verified":true}`, { retain: true });

  return { subscribe, unsubscribe };
};

export default createSubscriptionManager;
