import * as AWS from '@aws-sdk/client-kinesis';
import slsConfig from '../../sls-stack-output.json';
import { dispatcher, store } from 'store';
const decoder = new TextDecoder('utf-8');
const tablesToGet = ['Site', 'Unit', 'PCS', 'Battery', 'Rack', 'PV', 'MKT', 'Status'];

function cutTopic(topic) {
  if (topic.endsWith('UI')) {
    return topic.slice(0,-2);
  } else {
    return topic;
  }
}

function reduceData(data, siteSN, unitsSNs) {
  const allRecords = data.Records.reduce((records, r, i) => {
    const keyParts = r.PartitionKey.split('/');
    const topic = keyParts[keyParts.length - 1];
    if (tablesToGet.includes(topic)) {
      if (!records[topic]) {
        records[topic] = [];
      }
      records[topic].push(r);
    }
    return records;
  }, {});
  Object.keys(allRecords).forEach(t => {
    allRecords[t] = allRecords[t].map(r => {
      const d = JSON.parse(decoder.decode(r.Data));
      d._topic = r.PartitionKey;
      return d;
    }).sort((a, b) => a.TS === b.TS ? 0 : (a.TS > b.TS ? -1 : 1));
  });
  const groupedData = {};
  Object.keys(allRecords).forEach(t => {
    groupedData[t] = allRecords[t].reduce((acc,cv) => {
      const table = cutTopic(cv._topic.split('/')[cv._topic.split('/').length - 1]);
      if (
        ('Site' === table && cv.SN === siteSN)
        || (['Unit', 'PCS', 'Rack', 'Battery'].includes(table) && unitsSNs.includes(cv.SN))
        || ('Status' === table && [siteSN, ...unitsSNs].includes(cv.SN))
        || ('MKT' === table && [siteSN,`${siteSN}_GEN_Index`, `${siteSN}_LR_Index`])
        || ('PV' === table && cv.SN.startsWith(siteSN))
      ) {
        if (!acc[cv.SN]) {
          acc[cv.SN] = [];
        }
        acc[cv.SN].push(cv);
      }
      return acc;
    },{});
  });
  return groupedData;
}

export const kinesis = {
  getShards(kinesis) {
    return async () => {
      const shardsResponse = await kinesis.listShards({
        StreamName: slsConfig.RTDataKinesisStream,
      });
      return shardsResponse.Shards;
    };
  },
  getInitShardIterators(kinesis, shards, type, ts) {
    return async () => {
      const shardIterators = await Promise.all(shards.map(async (shard) => {
        return kinesis.getShardIterator({
          ShardId: shard.ShardId,
          StreamName: slsConfig.RTDataKinesisStream,
          ShardIteratorType: type,
          ...(ts ? {Timestamp: ts} : {}),
        }).then(r => r.ShardIterator);
      }));
      return shardIterators;
    };
  },

  processData(data, siteSN, unitsSNs, unitNames, dispatch) {
    return () => {
      const moduleState = store.getState();
      const reducedData = reduceData(data, siteSN, unitsSNs);
      const newState = { site: {}, vpp: {}, devices: {}, messages: {}, calendar: {} };
      if (reducedData?.Site && reducedData.Site[siteSN]) {
        if (!moduleState.site.currentSite || moduleState.site.currentSite?.initKinesisMsg) {
          newState.site.currentSite = {initKinesisMsg: true, ...reducedData.Site[siteSN][0]};
        }
      }
      if (reducedData?.Unit && Object.keys(reducedData.Unit).length) {
        newState.devices.UnitTable = [
          ...(moduleState.devices.UnitTable || [])
        ]; // copy curr UnitTable value
        Object.keys(reducedData?.Unit).forEach(sn => {
          const index = unitsSNs.findIndex(u => u === sn);
          const unitName = unitNames[index];
          const currentIndex = newState.devices.UnitTable.findIndex(u => u.SN === sn);
          const unit = {...reducedData.Unit[sn][0], UnitName: unitName, initKinesisMsg: true};
          if (currentIndex === -1) {
            newState.devices.UnitTable.push(unit);
            newState.devices.UnitTable.sort((a,b) => {
              const aIndex = unitsSNs.findIndex(el => el === a.SN);
              const bIndex = unitsSNs.findIndex(el => el === b.SN);
              if (aIndex > bIndex) {
                return 1;
              }
              if (aIndex < bIndex) {
                return -1;
              }
              return 0;
            });
          } else if (newState.devices.UnitTable[currentIndex]?.initKinesisMsg) {
            newState.devices.UnitTable[currentIndex] = unit;
          }
        });
      }
      if (reducedData?.PCS && Object.keys(reducedData.PCS).length) {
        Object.keys(reducedData?.PCS).forEach(sn => {
          if (!moduleState.devices.PCSDict[sn] || moduleState.devices.PCSDict[sn].initKinesisMsg) {
            newState.devices.PCSDict = { ...moduleState.devices.PCSDict };
            newState.devices.PCSDict[sn] = { ...reducedData.PCS[sn][0], initKinesisMsg: true };
          }
        });
      }
      if (reducedData?.Rack && Object.keys(reducedData.Rack).length) {
        Object.keys(reducedData?.Rack).forEach(sn => {
          if (
            !moduleState.devices.rackDict[sn]
            || moduleState.devices.rackDict[sn].initKinesisMsg
          ) {
            newState.devices.rackDict = { ...moduleState.devices.rackDict };
            newState.devices.rackDict[sn] = { ...reducedData.Rack[sn][0], initKinesisMsg: true };
          }
        });
      }
      if (reducedData?.Battery && Object.keys(reducedData.Battery).length) {
        Object.keys(reducedData?.Battery).forEach(sn => {
          if (
            !moduleState.devices.batteryDict[sn]
            || moduleState.devices.batteryDict[sn].initKinesisMsg
          ) {
            newState.devices.batteryDict = { ...moduleState.devices.batteryDict };
            newState.devices.batteryDict[sn] = {
              ...reducedData.Battery[sn][0],
              initKinesisMsg: true
            };
          }
        });
      }
      if (reducedData?.PV) { //???
        if (reducedData.PV[siteSN]) {
          newState.site.currentPV = {...reducedData.PV[siteSN], initKinesisMsg: true};
        }
      }
      if (reducedData?.MKT && Object.keys(reducedData.MKT).length) {
        Object.keys(reducedData.MKT).forEach(sn => {
          if (sn.includes('Index')) {
            if (!newState.site.currentMKT || newState.site.currentMKT.initKinesisMsg) {
              newState.site.currentMKT = { ...reducedData.MKT[sn][0], initKinesisMsg: true };
            }
          }
          else {
            const field_name = sn.includes('_GEN_') ? 'GEN_Index' : 'LR_Index';
            newState.site.currentIndexes = {
              ...moduleState.site.currentIndexes,
              [field_name]: { ...reducedData.MKT[sn][0], initKinesisMsg: true },
            };
          }
        });
      }
      if (reducedData?.Status && Object.keys(reducedData.Status).length) {
        Object.keys(reducedData?.Status).forEach(sn => {
          if (!moduleState.site.statusDict[sn] || moduleState.site.statusDict[sn].initKinesisMsg) {
            newState.site.statusDict = { ...moduleState.site.statusDict };
            newState.site.statusDict[sn] = { ...reducedData.Status[sn][0], initKinesisMsg: true };
          }
        });
      }
      Object.keys(newState).forEach(module => {
        dispatch(dispatcher(`UPDATE_${module.toUpperCase()}_MODULE`, newState[module]));
      });
    };
  },

  clearServiceFlag(dispatch) {
    return () => {
      const { devices, site } = store.getState();
      if (site.currentSite?.initKinesisMsg) {
        delete site.currentSite.initKinesisMsg;
      }
      if (devices?.UnitTable && devices.UnitTable.length) {
        devices.UnitTable.forEach((u,i) => {
          if (u.initKinesisMsg) {
            delete devices.UnitTable[i].initKinesisMsg;
          }
        });
      }
      if (devices?.PCSDict && Object.keys(devices.PCSDict).length) {
        Object.keys(devices.PCSDict).forEach((sn) => {
          if (devices.PCSDict[sn]?.initKinesisMsg) {
            delete devices.PCSDict[sn].initKinesisMsg;
          }
        });
      }
      if (devices?.rackDict && Object.keys(devices.rackDict).length) {
        Object.keys(devices.rackDict).forEach((sn) => {
          if (devices.rackDict[sn]?.initKinesisMsg) {
            delete devices.rackDict[sn].initKinesisMsg;
          }
        });
      }
      if (devices?.batteryDict && Object.keys(devices.batteryDict).length) {
        Object.keys(devices.batteryDict).forEach((sn) => {
          if (devices.batteryDict[sn]?.initKinesisMsg) {
            delete devices.batteryDict[sn].initKinesisMsg;
          }
        });
      }
      if (site.currentMKT?.initKinesisMsg) {
        delete site.currentMKT.initKinesisMsg;
      }
      if (site.currentIndexes?.GEN_Index?.initKinesisMsg) {
        delete site.currentIndexes.GEN_Index.initKinesisMsg;
      }
      if (site.currentIndexes?.LR_Index?.initKinesisMsg) {
        delete site.currentIndexes.LR_Index.initKinesisMsg;
      }
      if (site.statusDict && Object.keys(site.statusDict).length) {
        Object.keys(site.statusDict).forEach((sn) => {
          if (site.statusDict[sn]?.initKinesisMsg) {
            delete site.statusDict[sn].initKinesisMsg;
          }
        });
      }
      if (site.currentPV?.initKinesisMsg) {
        delete site.currentPV.initKinesisMsg;
      }
    };
  },

  getKinesisData(siteSN, unitsSNs, unitNames) { //Latest
    return async (dispatch) => {
      const moment = require('moment-timezone');
      const initDate = moment();
      console.time('startKinesis');
      const kinesisObj = new AWS.Kinesis({
        apiVersion: '2013-12-02'
      });
      const shards = await dispatch(kinesis.getShards(kinesisObj));
      const genRes = [];
      let shardIterators = await dispatch(kinesis.getInitShardIterators(kinesisObj, shards, 'LATEST'));
      await Promise.all(shardIterators.map(async iterator => {
        const resN = await kinesisObj.getRecords({
          ShardIterator: iterator,
        });
        genRes.push(resN);
        dispatch(kinesis.processData(resN, siteSN, unitsSNs, unitNames, dispatch));
      }));
      dispatch(kinesis.clearServiceFlag(dispatch));
      let endTS = initDate.clone();
      const diffDateArr = [5, 15, 30, 45, 60];
      for (let diffDate of diffDateArr) {
        const start = initDate.clone().add(-diffDate, 'm');
        let shardIterators1 = await dispatch(
          kinesis.getInitShardIterators(kinesisObj, shards, 'AT_TIMESTAMP', start.toISOString())
        );
        await Promise.all(shardIterators1.map(async iterator => {
          const func = async (iter) => {
            const resN = await kinesisObj.getRecords({
              ShardIterator: iter,
            });
            genRes.push(resN);
            dispatch(kinesis.processData(resN, siteSN, unitsSNs, unitNames, dispatch));
            const currDate = moment();
            const timeDiff = currDate.diff(endTS, 'seconds')*1000;
            if (resN.MillisBehindLatest > timeDiff && resN.NextShardIterator) {
              await func(resN.NextShardIterator);
            }
          };
          await func(iterator);
          endTS = start.clone().add(-1, 'ms');
        }));
        dispatch(kinesis.clearServiceFlag(dispatch));
      }

      console.timeEnd('startKinesis');
    };
  }
};