Subversion Repositories SmartDukaan

Rev

Rev 31170 | Rev 31340 | Go to most recent revision | View as "text/plain" | Blame | Compare with Previous | Last modification | View Log | RSS feed

package com.spice.profitmandi.service.scheme;

import com.spice.profitmandi.common.enumuration.ItemType;
import com.spice.profitmandi.common.enumuration.MessageType;
import com.spice.profitmandi.common.exception.ProfitMandiBusinessException;
import com.spice.profitmandi.common.model.ProfitMandiConstants;
import com.spice.profitmandi.common.model.SchemeModel;
import com.spice.profitmandi.common.model.SendNotificationModel;
import com.spice.profitmandi.common.services.ReporticoService;
import com.spice.profitmandi.common.util.FormattingUtils;
import com.spice.profitmandi.common.util.StringUtils;
import com.spice.profitmandi.common.util.Utils;
import com.spice.profitmandi.dao.entity.catalog.Item;
import com.spice.profitmandi.dao.entity.catalog.Scheme;
import com.spice.profitmandi.dao.entity.catalog.SchemeRegion;
import com.spice.profitmandi.dao.entity.catalog.SchemesImeisModel;
import com.spice.profitmandi.dao.entity.fofo.*;
import com.spice.profitmandi.dao.entity.transaction.PriceDrop;
import com.spice.profitmandi.dao.enumuration.catalog.AmountType;
import com.spice.profitmandi.dao.enumuration.catalog.SchemeType;
import com.spice.profitmandi.dao.enumuration.fofo.ScanType;
import com.spice.profitmandi.dao.enumuration.transaction.SchemePayoutStatus;
import com.spice.profitmandi.dao.model.CreateSchemeRequest;
import com.spice.profitmandi.dao.repository.catalog.*;
import com.spice.profitmandi.dao.repository.dtr.RetailerRepository;
import com.spice.profitmandi.dao.repository.fofo.*;
import com.spice.profitmandi.dao.repository.transaction.PriceDropRepository;
import com.spice.profitmandi.service.NotificationService;
import com.spice.profitmandi.service.authentication.RoleManager;
import com.spice.profitmandi.service.inventory.PurchaseService;
import com.spice.profitmandi.service.wallet.WalletService;
import in.shop2020.model.v1.order.WalletReferenceType;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.hibernate.Session;
import org.hibernate.SessionFactory;
import org.hibernate.query.Query;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.stereotype.Component;

import javax.persistence.TypedQuery;
import javax.persistence.criteria.CriteriaBuilder;
import javax.persistence.criteria.CriteriaQuery;
import javax.persistence.criteria.Predicate;
import javax.persistence.criteria.Root;
import java.text.MessageFormat;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.YearMonth;
import java.util.*;
import java.util.stream.Collectors;

@Component
public class SchemeServiceImpl implements SchemeService {

        private static final Logger LOGGER = LogManager.getLogger(SchemeServiceImpl.class);

        @Autowired
        @Qualifier("fofoInventoryItemRepository")
        private InventoryItemRepository inventoryItemRepository;

        @Autowired
        private ActivatedImeiRepository activatedImeiRepository;

        @Autowired
        private PartnerTypeChangeService partnerTypeChangeService;

        @Autowired
        private PurchaseService purchaseService;

        @Autowired
        private ScanRecordRepository scanRecordRepository;

        @Autowired
        private SessionFactory sessionFactory;

        private static final Set<Integer> tagIds = new HashSet<Integer>(Arrays.asList(4));

        @Autowired
        private SchemeRepository schemeRepository;

        @Autowired
        private PriceDropRepository priceDropRepository;

        @Autowired
        private RoleManager roleManager;

        @Autowired
        private RetailerRepository retailerRepository;

        @Autowired
        private ReporticoService reporticoService;


        @Autowired
        private TagListingRepository tagListingRepository;

        @Autowired
        private SchemeInOutRepository schemeInOutRepository;

        @Autowired
        @Qualifier("catalogItemRepository")
        private ItemRepository itemRepository;

        @Autowired
        private SchemeItemRepository schemeItemRepository;

        @Autowired
        private SchemeRegionRepository schemeRegionRepository;

        @Autowired
        private WalletService walletService;

        @Autowired
        private PurchaseRepository purchaseRepository;

        @Autowired
        private FofoOrderRepository fofoOrderRepository;

        private static final List<String> BLOCKED_IMEIS = Arrays.asList("864883056397593", "864883054606656", "864883056567815", "861950056518271", "869175055649511", "861362058924574", "866009066803036", "866009066816699", "866009066816137", "866009066815873", "866009066805536", "866009066803010", "866009066821939", "866009066802756", "866009066820592", "866009066820311", "866009066816491", "866009066816376", "866009066815899", "866009066815774", "866009066817937", "866009066819859", "866009066817655", "866009066820691", "866009066820832", "866009066803291", "866009066820733", "866009066814496", "866009066820451", "866009066820659", "866009066804976", "866009066820717", "866009066816095", "861362054898434", "869599051117852", "869599056695332", "869599056695894", "864883057389656", "862661052418692", "860118051929254", "862888051664998", "862680054625831", "862888051666316", "860118051738895", "868066050447970", "868066052424399", "865084051552576", "865084050755097", "865084050755295", "865084050754819", "864883057487278", "864883057389599", "864883057437455", "864883057388278", "862680058278058", "869599056810139", "862200053994193", "861932057188916", "861175050581774", "863933065909093", "863933065635391", "861362054889177", "864004062055154", "864004062069239", "862661050221676", "862661052416993", "866812058631475", "869599051118173", "869599051504273", "868066052729250", "864883057701397", "864883054123033", "864883054947316", "864883056235694", "868066052727692", "866030052139896", "866030052140175", "860588051522053", "860588051513193", "861932056969779", "869599056171995", "865594061074932", "863935059410491", "866088059072718", "869599055375894", "869599054306916", "863782054006472", "863782054012371", "860588053486992", "868066052726835", "868066052726694", "860688053876430", "860688053869674", "868494052222110", "868494054682394", "869599053512357");

        private static final List<SchemeType> ACTIVATION_SCHEME_TYPES = Arrays.asList(SchemeType.ACTIVATION, SchemeType.SPECIAL_SUPPORT);

        @Override
        public void saveScheme(int creatorId, CreateSchemeRequest createSchemeRequest) throws ProfitMandiBusinessException {

                this.validateCreateSchemeRequest(createSchemeRequest);

                Scheme scheme = this.toScheme(creatorId, createSchemeRequest);

                if (scheme.getStartDateTime().isAfter(scheme.getEndDateTime())) {
                        throw new ProfitMandiBusinessException(
                                        ProfitMandiConstants.START_DATE + ", " + ProfitMandiConstants.END_DATE,
                                        scheme.getStartDateTime() + ", " + scheme.getEndDateTime(), "SCHM_VE_1005");
                }

                // this.validateItemIds(createSchemeRequest);
                schemeRepository.persist(scheme);
                for (int catalogId : createSchemeRequest.getCatalogIds()) {
                        SchemeItem schemeItem = new SchemeItem();
                        schemeItem.setSchemeId(scheme.getId());
                        schemeItem.setCatalogId(catalogId);
                        schemeItemRepository.persist(schemeItem);
                }

                for (int regionId : createSchemeRequest.getRegionIds()) {
                        SchemeRegion schemeRegion = new SchemeRegion();
                        schemeRegion.setSchemeId(scheme.getId());
                        schemeRegion.setRegionId(regionId);
                        schemeRegionRepository.persist(schemeRegion);
                }

        }

        private void validateCreateSchemeRequest(CreateSchemeRequest createSchemeRequest)
                        throws ProfitMandiBusinessException {
                if (createSchemeRequest.getName() == null || createSchemeRequest.getName().isEmpty()) {
                        throw new ProfitMandiBusinessException(ProfitMandiConstants.NAME, createSchemeRequest.getName(),
                                        "SCHM_VE_1000");
                }
                if (createSchemeRequest.getAmount() <= 0) {
                        throw new ProfitMandiBusinessException(ProfitMandiConstants.AMOUNT, createSchemeRequest.getAmount(),
                                        "SCHM_VE_1001");
                }

                if (createSchemeRequest.getAmountType().equals(AmountType.PERCENTAGE)
                                && createSchemeRequest.getAmount() > 100) {
                        throw new ProfitMandiBusinessException(ProfitMandiConstants.AMOUNT, createSchemeRequest.getAmount(),
                                        "SCHM_VE_1002");
                }

                if (createSchemeRequest.getStartDate() == null) {
                        throw new ProfitMandiBusinessException(ProfitMandiConstants.START_DATE, createSchemeRequest.getStartDate(),
                                        "SCHM_VE_1003");
                }
                if (createSchemeRequest.getEndDate() == null) {
                        throw new ProfitMandiBusinessException(ProfitMandiConstants.END_DATE, createSchemeRequest.getEndDate(),
                                        "SCHM_VE_1004");
                }
        }

        @Autowired
        StateGstRateRepository stateGstRateRepository;


        private void validateItemIds(CreateSchemeRequest createSchemeRequest) throws ProfitMandiBusinessException {
                if (createSchemeRequest.getCatalogIds() == null || createSchemeRequest.getCatalogIds().isEmpty()) {
                        throw new ProfitMandiBusinessException(ProfitMandiConstants.ITEM_ID, createSchemeRequest.getCatalogIds(),
                                        "SCHM_1003");
                }
                List<Integer> foundItemIds = itemRepository.selectIdsByIdsAndType(createSchemeRequest.getCatalogIds(),
                                ItemType.SERIALIZED);
                if (foundItemIds.size() != createSchemeRequest.getCatalogIds().size()) {
                        createSchemeRequest.getCatalogIds().removeAll(foundItemIds);
                        throw new ProfitMandiBusinessException(ProfitMandiConstants.ITEM_ID, createSchemeRequest.getCatalogIds(),
                                        "SCHM_1004");
                }
        }

        @Override
        public Scheme getSchemeById(int schemeId) throws ProfitMandiBusinessException {
                Scheme scheme = schemeRepository.selectById(schemeId);
                List<Integer> catalogIds = schemeItemRepository.selectCatalogIdsBySchemeId(scheme.getId());
                if (catalogIds.size() > 0) {
                        List<Item> items = itemRepository.selectAllByCatalogIds(new HashSet<>(catalogIds));
                        scheme.setCatalogStringMap(this.toCatalogStringMap(items));
                }
                return scheme;
        }

        public Map<Integer, String> toCatalogStringMap(List<Item> items) {
                Map<Integer, String> catalogMap = new HashMap<>();
                for (Item item : items) {
                        if (!catalogMap.containsKey(item.getCatalogItemId())) {
                                catalogMap.put(item.getCatalogItemId(), item.getItemDescriptionNoColor());
                        }
                }
                return catalogMap;
        }

        private Set<Integer> schemeItemsToCatalogIds(List<SchemeItem> schemeItems) {
                Set<Integer> catalogId = new HashSet<>();
                for (SchemeItem schemeItem : schemeItems) {
                        catalogId.add(schemeItem.getCatalogId());
                }
                return catalogId;
        }

        @Override
        public List<SchemeModel> getAllSchemeModels(LocalDateTime startDateTime, LocalDateTime endDateTime) {
                List<Scheme> schemes = schemeRepository.selectAllBetweenCreateTimestamp(startDateTime, endDateTime);
                Map<Integer, Scheme> schemeIdSchemeMap = schemes.stream().collect(Collectors.toMap(x -> x.getId(), x -> x));
                List<SchemeItem> schemeItems = schemeItemRepository.selectBySchemeIds(schemeIdSchemeMap.keySet());
                Set<Integer> catalogIds = schemeItems.stream().map(x -> x.getCatalogId()).collect(Collectors.toSet());
                List<Item> items = itemRepository.selectAllByCatalogIds(catalogIds);
                Map<Integer, String> catalogStringMap = this.toCatalogStringMap(items);
                this.addCatalogIdsToSchemes(schemeItems, schemeIdSchemeMap, catalogStringMap);
                return this.toSchemeModels(schemeIdSchemeMap);
        }

        @Autowired
        NotificationService notificationService;

        private List<SchemeModel> toSchemeModels(Map<Integer, Scheme> schemeIdSchemeMap) {
                List<SchemeModel> schemeModels = new ArrayList<>();
                for (Map.Entry<Integer, Scheme> schemeIdSchemeEntry : schemeIdSchemeMap.entrySet()) {
                        schemeModels.add(this.toSchemeModel(schemeIdSchemeEntry.getValue()));
                }
                return schemeModels;
        }

        private SchemeModel toSchemeModel(Scheme scheme) {
                SchemeModel schemeModel = new SchemeModel();
                schemeModel.setSchemeId(scheme.getId());
                schemeModel.setName(scheme.getName());
                schemeModel.setDescription(scheme.getDescription());
                schemeModel.setSchemeType(scheme.getType().toString());
                schemeModel.setAmountType(scheme.getAmountType().toString());
                schemeModel.setAmount(scheme.getAmount());
                schemeModel.setStartDateTime(StringUtils.toString(scheme.getStartDateTime()));
                schemeModel.setEndDateTime(StringUtils.toString(scheme.getEndDateTime()));
                schemeModel.setCreateTimestamp(StringUtils.toString(scheme.getCreateTimestamp()));
                schemeModel.setActiveTimestamp(StringUtils.toString(scheme.getActiveTimestamp()));
                schemeModel.setExpireTimestamp(StringUtils.toString(scheme.getExpireTimestamp()));
                schemeModel.setCreatedBy(scheme.getCreatedBy());
                schemeModel.setCatalogStringMap(scheme.getCatalogStringMap());
                schemeModel.setRetailerIds(scheme.getRetailerIds());
                return schemeModel;
        }

        private void addCatalogIdsToSchemes(List<SchemeItem> schemeItems, Map<Integer, Scheme> schemeIdSchemeMap,
                                                                                Map<Integer, String> catalogStringMap) {
                for (SchemeItem schemeItem : schemeItems) {
                        Scheme scheme = schemeIdSchemeMap.get(schemeItem.getSchemeId());
                        scheme.getCatalogStringMap().put(schemeItem.getCatalogId(), catalogStringMap.get(schemeItem.getCatalogId()));
                }
        }

        @Override
        public void activeSchemeById(int schemeId) throws ProfitMandiBusinessException {
                Scheme scheme = schemeRepository.selectById(schemeId);
                if (scheme.getActiveTimestamp() != null) {
                        throw new ProfitMandiBusinessException(ProfitMandiConstants.ACTIVE_TIMESTAMP, scheme.getActiveTimestamp(),
                                        "SCHM_1005");
                }
                if (scheme.getExpireTimestamp() != null) {
                        throw new ProfitMandiBusinessException(ProfitMandiConstants.EXPIRE_TIMESTAMP, scheme.getExpireTimestamp(),
                                        "SCHM_1006");
                }
                scheme.setActiveTimestamp(LocalDateTime.now());
                this.sendSchemeNotification(scheme);


                /*
                 * if (scheme.getType() == SchemeType.IN) {
                 * this.processPreviousPurchases(scheme); } else if (scheme.getType() ==
                 * SchemeType.OUT) { this.processPreviousSales(scheme); }
                 */
        }

        private void sendSchemeNotification(Scheme scheme) throws ProfitMandiBusinessException {
                if (ACTIVATION_SCHEME_TYPES.contains(scheme.getType())) {

                        SendNotificationModel sendNotificationModel = new SendNotificationModel();
                        List<SchemeItem> schemeItems = schemeItemRepository.selectBySchemeIds(Collections.singleton(scheme.getId()));
                        Set<Integer> catalogIds = schemeItems.stream().map(x -> x.getCatalogId()).collect(Collectors.toSet());
                        List<String> itemDescriptions = itemRepository.selectAllByCatalogIds(catalogIds).stream().filter(Utils.distinctByKey(Item::getCatalogItemId))
                                        .map(x -> x.getItemDescriptionNoColor()).collect(Collectors.toList());
                        String titleTemplate = "%s of Rs.%s for %s";
                        String schemeString = "Activation scheme";
                        if (scheme.getType().equals(SchemeType.SPECIAL_SUPPORT)) {
                                schemeString = "Special Support";
                        }

                        String message = "Duration from - " + FormattingUtils.formatDateMonth(scheme.getStartDateTime()) + " - " + FormattingUtils.formatDateMonth(scheme.getEndDateTime());
                        sendNotificationModel.setCampaignName("activationscheme");
                        sendNotificationModel.setUrl("https://app.smartdukaan.com/pages/home/scheme/" + scheme.getId());
                        sendNotificationModel.setExpiresat(LocalDateTime.now().plusDays(1));
                        sendNotificationModel.setMessage(message);
                        sendNotificationModel.setTitle(String.format(titleTemplate, schemeString, FormattingUtils.formatDecimal(scheme.getAmount()), org.apache.commons.lang3.StringUtils.abbreviate(String.join(", ", itemDescriptions), 25)));
                        sendNotificationModel.setType("url");
                        sendNotificationModel.setMessageType(MessageType.scheme);
                        notificationService.sendNotificationToAll(sendNotificationModel);
                }
        }

        @Override
        public void expireSchemeById(int schemeId, LocalDateTime expiryTime) throws ProfitMandiBusinessException {
                Scheme scheme = schemeRepository.selectById(schemeId);
                if (scheme == null || scheme.getActiveTimestamp() == null) {
                        throw new ProfitMandiBusinessException(ProfitMandiConstants.ACTIVE_TIMESTAMP, scheme.getActiveTimestamp(),
                                        "SCHM_1007");
                }
                if (scheme.getExpireTimestamp() != null) {
                        throw new ProfitMandiBusinessException(ProfitMandiConstants.EXPIRE_TIMESTAMP, scheme.getExpireTimestamp(),
                                        "SCHM_1008");
                }
                scheme.setExpireTimestamp(LocalDateTime.now());
                if (expiryTime.isAfter(scheme.getEndDateTime())) {
                        throw new ProfitMandiBusinessException(ProfitMandiConstants.EXPIRE_TIMESTAMP, scheme.getExpireTimestamp(),
                                        "End Date cant be extended during expiry");
                }
                scheme.setEndDateTime(expiryTime);
                schemeRepository.persist(scheme);
        }

        private Map<Integer, Scheme> toSchemeIdSchemeMap(List<Scheme> schemes) {
                Map<Integer, Scheme> schemeIdSchemeMap = new HashMap<>();
                for (Scheme scheme : schemes) {
                        schemeIdSchemeMap.put(scheme.getId(), scheme);
                }
                return schemeIdSchemeMap;
        }

        private Map<Integer, Set<Scheme>> toCatalogIdSchemesMap(List<SchemeItem> schemeItems, List<Scheme> schemes) {
                Map<Integer, Scheme> schemesMap = schemes.stream().collect(Collectors.toMap(x -> x.getId(), x -> x));
                Map<Integer, Set<Scheme>> catalogSchemesMap = new HashMap<>();
                for (SchemeItem schemeItem : schemeItems) {
                        if (!catalogSchemesMap.containsKey(schemeItem.getCatalogId())) {
                                catalogSchemesMap.put(schemeItem.getCatalogId(), new HashSet<>());
                        }
                        Set<Scheme> schemesSet = catalogSchemesMap.get(schemeItem.getCatalogId());
                        schemesSet.add(schemesMap.get(schemeItem.getSchemeId()));
                }
                return catalogSchemesMap;
        }

        private Map<InventoryItem, Set<Scheme>> toInventoryItemSchemesMap(List<Scheme> schemes,
                                                                                                                                          List<InventoryItem> inventoryItems) {
                Set<Integer> schemeIds = schemes.stream().map(x -> x.getId()).collect(Collectors.toSet());
                Set<Integer> itemIds = inventoryItems.stream().map(x -> x.getItemId()).collect(Collectors.toSet());
                Set<Integer> catalogIds = itemRepository.selectByIds(itemIds).stream().map(x -> x.getCatalogItemId()).collect(Collectors.toSet());
                List<SchemeItem> schemeItems = schemeItemRepository.selectBySchemeIdsAndCatalogIds(schemeIds, catalogIds);

                Map<Integer, Set<Scheme>> catalogIdSchemesMap = this.toCatalogIdSchemesMap(schemeItems, schemes);
                Map<InventoryItem, Set<Scheme>> inventoryItemSchemesMap = new HashMap<>();
                for (InventoryItem inventoryItem : inventoryItems) {
                        LOGGER.info("inventoryItem {}", inventoryItem);
                        LOGGER.info("inventoryItem.getItem() {}", inventoryItem.getItem());
                        LOGGER.info("catalogIdSchemesMap {}", catalogIdSchemesMap);
                        if (catalogIdSchemesMap.containsKey(inventoryItem.getItem().getCatalogItemId())) {
                                inventoryItemSchemesMap.put(inventoryItem, catalogIdSchemesMap.get(inventoryItem.getItem().getCatalogItemId()));
                        }
                }
                return inventoryItemSchemesMap;
        }

        @Override
        public void processSchemeIn(int purchaseId, int retailerId) throws ProfitMandiBusinessException {
                LOGGER.info("Trying to process SchemeIn with purchaseId [{}] and retailerId [{}]", purchaseId, retailerId);
                Purchase purchase = purchaseRepository.selectByIdAndFofoId(purchaseId, retailerId);
                // TODO - SCHEME
                PartnerType partnerType = partnerTypeChangeService.getTypeOnMonth(retailerId,
                                YearMonth.from(purchase.getCreateTimestamp()));
                // PartnerType partnerType = partnerTypeChangeService.getTypeOnDate(retailerId,
                // purchase.getCreateTimestamp().toLocalDate());

                List<Scheme> schemes = schemeRepository.selectActiveAll(Arrays.asList(SchemeType.IN), partnerType,
                                purchase.getCreateTimestamp().toLocalDate(), false);
                List<Integer> validSchemeIds = schemeRepository.selectSchemesByRetailerIdsSchemeIds(retailerId, schemes.stream().map(x -> x.getId()).collect(Collectors.toList()));
                schemes = schemes.stream().filter(x -> validSchemeIds.contains(x.getId())).collect(Collectors.toList());
                float totalCashback = 0;

                if (schemes.isEmpty()) {
                        return;
                }
                List<InventoryItem> inventoryItems = inventoryItemRepository.selectByPurchaseId(purchaseId);
                //Remove imeis from blocked imeis list
                inventoryItems = inventoryItems.stream().filter(inventoryItem -> BLOCKED_IMEIS.contains(inventoryItem.getSerialNumber())).collect(Collectors.toList());

                Set<Integer> itemIds = inventoryItems.stream().map(x -> x.getItemId()).collect(Collectors.toSet());
                Map<Integer, Item> itemsMap = itemRepository.selectByIds(itemIds).stream().collect(Collectors.toMap(x -> x.getId(), x -> x));
                inventoryItems.stream().forEach(x -> x.setItem(itemsMap.get(x.getItemId())));

                LocalDateTime billingDate = purchaseService.getBillingDateOfPurchase(purchaseId);
                Set<Integer> itemIdsSet = tagListingRepository.selectByItemIdsAndTagIds(itemIds, tagIds).stream()
                                .filter(x -> x.getEolDate() == null || x.getEolDate().isAfter(billingDate)).map(x -> x.getItemId())
                                .collect(Collectors.toSet());
                // Only consider inventory items that were not returned
                inventoryItems = inventoryItems.stream().filter(x -> itemIdsSet.contains(x.getItemId()))
                                .filter(x -> !x.getLastScanType().equals(ScanType.PURCHASE_RET_BAD))
                                .filter(x -> !x.getLastScanType().equals(ScanType.PURCHASE_RET)).collect(Collectors.toList());
                LOGGER.info(inventoryItems);
                if (inventoryItems.size() == 0)
                        return;
                Map<InventoryItem, Set<Scheme>> inventoryItemSchemesMap = this.toInventoryItemSchemesMap(schemes,
                                inventoryItems);

                if (inventoryItemSchemesMap.isEmpty()) {
                        return;
                }
                Map<InventoryItem, Set<Scheme>> allInventoryItemSchemesMap = new HashMap<>();

                for (Map.Entry<InventoryItem, Set<Scheme>> inventoryItemSchemesEntry : inventoryItemSchemesMap.entrySet()) {
                        Set<Scheme> allSchemes = new HashSet<>();
                        for (Scheme scheme : inventoryItemSchemesEntry.getValue()) {
                                allSchemes.add(scheme);
                        }
                        allInventoryItemSchemesMap.put(inventoryItemSchemesEntry.getKey(), allSchemes);
                }

                //

                int itemsCount = 0;
                for (Map.Entry<InventoryItem, Set<Scheme>> allInventoryItemSchemesEntry : allInventoryItemSchemesMap
                                .entrySet()) {
                        float inventoryItemCashback = 0;
                        for (Scheme scheme : allInventoryItemSchemesEntry.getValue()) {
                                InventoryItem inventoryItem = allInventoryItemSchemesEntry.getKey();
                                float cashback = this.createSchemeInOut(scheme, inventoryItem);
                                inventoryItemCashback += cashback;
                        }
                        if (inventoryItemCashback > 0) {
                                totalCashback += inventoryItemCashback;
                                itemsCount++;
                        }
                }

                LOGGER.info("Items count for purchase id {} is {}", purchaseId, itemsCount);
                if (itemsCount > 0) {
                        walletService.addAmountToWallet(
                                        retailerId, purchaseId, WalletReferenceType.SCHEME_IN, "Added for SCHEME IN against invoice "
                                                        + purchase.getPurchaseReference() + " (total " + itemsCount + " pcs)",
                                        totalCashback, purchase.getCreateTimestamp());
                        LOGGER.info("Added Rs.{} for SCHEME IN against invoice {} total pcs({}) {}", totalCashback,
                                        purchase.getPurchaseReference(), itemsCount);
                        purchase.setCashback(purchase.getCashback() + totalCashback);
                        purchaseRepository.persist(purchase);
                }
        }

        private Scheme toScheme(int creatorId, CreateSchemeRequest createSchemeRequest) {
                Scheme scheme = new Scheme();
                scheme.setName(createSchemeRequest.getName());
                scheme.setDescription(createSchemeRequest.getDescription());
                scheme.setType(createSchemeRequest.getType());
                scheme.setAmountType(createSchemeRequest.getAmountType());
                scheme.setAmount(createSchemeRequest.getAmount());
                scheme.setPartnerType(createSchemeRequest.getPartnerType());
                scheme.setStartDateTime(createSchemeRequest.getStartDate());
                scheme.setEndDateTime(createSchemeRequest.getEndDate());
                scheme.setCreatedBy(creatorId);
                scheme.setCashback(createSchemeRequest.isCashback());
                return scheme;
        }

        //Only in and activation margins are allowed to be rolled out more than twice
        private float createSchemeInOut(Scheme scheme, InventoryItem inventoryItem) throws ProfitMandiBusinessException {
                LOGGER.info("Scheme === {}", scheme);
                if ((scheme.getId() == 411 || scheme.getId() == 612) && inventoryItem.getCreateTimestamp().isAfter(LocalDate.of(2021, 12, 1).atStartOfDay())) {
                        return 0;
                }
                List<SchemeInOut> sios = schemeInOutRepository.selectAllByType(scheme.getType(), inventoryItem.getId());
                float actualCredit = 0;
                if (sios.stream().filter(x -> x.getRolledBackTimestamp() == null && x.getSchemeId() == scheme.getId())
                                .collect(Collectors.toList()).size() == 0) {
                        sios = sios.stream().filter(x -> x.getRolledBackTimestamp() == null).collect(Collectors.toList());
                        //Rejected Scheme for types INVESTMENT and ACTIVATION are considered rolledback only if the item billing is cancelled.
                        float amountCredited = (float) sios.stream().mapToDouble(e -> e.getAmount()).sum();

                        LOGGER.info("SIOS ===== {}", sios);
                        float amountToCredit = this.getAmount(inventoryItem, scheme);
                        //Activation and in scheme
                        if (!scheme.getType().equals(SchemeType.IN) && !scheme.getType().equals(SchemeType.ACTIVATION) &&
                                        !scheme.getType().equals(SchemeType.SPECIAL_SUPPORT) && sios.size() > 0) {

                                if (sios.size() > 1) {
                                        LOGGER.info("SAMESCHEMETYPE has already been credited twice for inventoryItem - {}", inventoryItem.getId());
                                        return 0;
                                }
                                if (amountToCredit > amountCredited + 1f) {
                                        for (SchemeInOut sio : sios) {
                                                sio.setRolledBackTimestamp(LocalDateTime.now());
                                                sio.setStatus(SchemePayoutStatus.REJECTED);
                                                sio.setStatusDescription("Partner Category upgraded to " + scheme.getPartnerType() + ", new entry for margin added");
                                        }
                                        actualCredit = amountToCredit - amountCredited;
                                } else {
                                        return 0;            //Rejected Scheme for types INVESTMENT and ACTIVATION are considered rolledback only if the item billing is cancelled.

                                }
                        } else {
                                actualCredit = amountToCredit;
                        }
                        LOGGER.info("Actual Credit ==== {}", actualCredit);

                        SchemeInOut schemeInOut = new SchemeInOut();
                        schemeInOut.setSchemeId(scheme.getId());
                        schemeInOut.setInventoryItemId(inventoryItem.getId());
                        schemeInOut.setAmount(amountToCredit);
                        schemeInOutRepository.persist(schemeInOut);

                        if (scheme.getType().equals(SchemeType.ACTIVATION)) {
                                schemeInOut.setStatus(SchemePayoutStatus.PENDING);
                                schemeInOut.setStatusDescription("Activation pending for IMEI#" + inventoryItem.getSerialNumber());
                                return 0;
                        } else if (scheme.getType().equals(SchemeType.INVESTMENT)) {
                                schemeInOut.setStatus(SchemePayoutStatus.PENDING);
                                schemeInOut.setStatusDescription("Subject to investment days maintained");
                                return 0;
                        } else {
                                schemeInOut.setStatus(SchemePayoutStatus.CREDITED);
                                schemeInOut.setCreditTimestamp(LocalDateTime.now());
                                if (scheme.getType().equals(SchemeType.IN)) {
                                        schemeInOut.setStatusDescription("Credited for GRN of IMEI#" + inventoryItem.getSerialNumber());
                                } else if (SchemeService.OUT_SCHEME_TYPES.contains(scheme.getType())) {
                                        schemeInOut.setStatusDescription("Credited for sale of IMEI#" + inventoryItem.getSerialNumber());
                                }
                        }
                }
                return actualCredit;
        }

        // We are maintaining price drop after grn
        private float getAmount(InventoryItem inventoryItem, Scheme scheme) throws ProfitMandiBusinessException {
                if (BLOCKED_IMEIS.contains(inventoryItem.getSerialNumber())) {
                        return 0;
                }
                float amount = 0;
                float dpForCalc = 0;
                float taxableSellingPrice = 0;

                //float totalTaxRate = stateGstRateRepository.getTotalTaxRate(inventoryItem.getItemId());
                if (scheme.getAmountType().equals(AmountType.PERCENTAGE)) {
                        if (scheme.getType().equals(SchemeType.IN)) {
                                dpForCalc = inventoryItem.getUnitPrice() - inventoryItem.getPriceDropAmount();
                        } else {
                                try {
                                        dpForCalc = Math.min(inventoryItem.getUnitPrice() - inventoryItem.getPriceDropAmount(),
                                                        tagListingRepository.selectByItemId(inventoryItem.getItemId()).getSellingPrice());
                                } catch (Exception e) {
                                        LOGGER.info("Could not find tag Listing entry in {}", inventoryItem.getItemId());
                                        e.printStackTrace();
                                }
                        }
                        //TODO:Should be calculated on unit price
                        //taxableSellingPrice = dpForCalc / (1 + totalTaxRate / 100);
                        //amount = taxableSellingPrice * scheme.getAmount() / 100;
                        amount = dpForCalc * scheme.getAmount() / 100;
                        System.out.println(String.format("%d\t%s\t%d\t%d\t%s\t%s\t%s\t%s\t%f\t%f\t%f\t%f", inventoryItem.getId(),
                                        inventoryItem.getSerialNumber(), inventoryItem.getItemId(), scheme.getId(), scheme.getName(),
                                        scheme.getType(), scheme.getAmountType(), scheme.getPartnerType(), dpForCalc, taxableSellingPrice,
                                        scheme.getAmount(), amount));
                } else if (scheme.getType().equals(SchemeType.IN)) {
                        amount = scheme.getAmount();
                }
                return amount;
        }

        @Override
        public float processSchemeOut(int fofoOrderId, int retailerId) throws ProfitMandiBusinessException {
                float totalCashback = 0;
                FofoOrder fofoOrder = fofoOrderRepository.selectByFofoIdAndOrderId(retailerId, fofoOrderId);
                // Process only if order is not cancelled
                if (fofoOrder.getCancelledTimestamp() == null) {
                        // PartnerType partnerType = partnerTypeChangeService.getTypeOnDate(retailerId,
                        // fofoOrder.getCreateTimestamp().toLocalDate());
                        // TODO - SCHEME
                        PartnerType partnerType = partnerTypeChangeService.getTypeOnMonth(retailerId,
                                        YearMonth.from(fofoOrder.getCreateTimestamp()));

                        List<ScanRecord> scanRecords = scanRecordRepository.selectAllByOrderId(fofoOrderId);
                        if (scanRecords.size() == 0) return 0;
                        Set<Integer> inventoryItemIds = scanRecords.stream().map(x -> x.getInventoryItemId())
                                        .collect(Collectors.toSet());
                        LOGGER.info("fofoOrderId --- {}", fofoOrderId);
                        LOGGER.info("scanRecords --- {}", scanRecords);
                        LOGGER.info("inventoryItemIds --- {}", inventoryItemIds);
                        Set<InventoryItem> inventoryItems = inventoryItemRepository.selectByIds(inventoryItemIds).stream()
                                        .filter(x -> x.getSerialNumber() != null && !x.getSerialNumber().equals(""))
                                        .collect(Collectors.toSet());
                        inventoryItems = inventoryItems.stream().filter(inventoryItem -> BLOCKED_IMEIS.contains(inventoryItem.getSerialNumber())).collect(Collectors.toSet());
                        if (inventoryItems.size() == 0) {
                                return 0;
                        }
                        Set<Integer> itemIds = inventoryItems.stream().map(x -> x.getItemId()).collect(Collectors.toSet());

                        // Remove Items that are eol now.
                        Set<Integer> itemIdsSet = tagListingRepository.selectByItemIdsAndTagIds(itemIds, tagIds).stream()
                                        .filter(x -> x.getEolDate() == null || x.getEolDate().isAfter(fofoOrder.getCreateTimestamp()))
                                        .map(x -> x.getItemId()).collect(Collectors.toSet());
                        // Only consider inventory items that were not returned
                        //ItemCriteria itemCriteria = new ItemCriteria();
                        //itemCriteria.setItemIds(new ArrayList<>(itemIdsSet));
                        //List<Integer> catalogIds = itemRepository.getCatalogIds(itemCriteria);
                        inventoryItems = inventoryItems.stream().filter(x -> itemIdsSet.contains(x.getItemId()))
                                        .collect(Collectors.toSet());

                        if (inventoryItems.size() == 0) {
                                return 0;
                        }

                        int count = 0;

                        List<SchemeType> allOutSchemeTypes = new ArrayList<>();
                        allOutSchemeTypes.addAll(Arrays.asList(SchemeType.ACTIVATION, SchemeType.INVESTMENT, SchemeType.SPECIAL_SUPPORT));
                        allOutSchemeTypes.addAll(OUT_SCHEME_TYPES);
                        List<Scheme> allActiveSchemes = schemeRepository.selectActiveAll(allOutSchemeTypes, partnerType,
                                        fofoOrder.getCreateTimestamp().toLocalDate(), false);
                        List<Integer> validSchemeIds = schemeRepository.selectSchemesByRetailerIdsSchemeIds(retailerId, allActiveSchemes.stream().map(x -> x.getId()).collect(Collectors.toList()));
                        allActiveSchemes = allActiveSchemes.stream().filter(x -> validSchemeIds.contains(x.getId())).collect(Collectors.toList());
                        for (InventoryItem inventoryItem : inventoryItems) {
                                float itemCashback = 0;
                                Set<Integer> schemeIds = new HashSet<>(
                                                schemeItemRepository.selectSchemeIdByCatalogId(inventoryItem.getItem().getCatalogItemId()));
                                List<Scheme> itemActiveSchemes = allActiveSchemes.stream().filter(x -> schemeIds.contains(x.getId()))
                                                .collect(Collectors.toList());
                                List<Scheme> supportSchemes = itemActiveSchemes.stream().filter(x -> Arrays.asList(SchemeType.SPECIAL_SUPPORT, SchemeType.ACTIVATION).contains(x.getType())).collect(Collectors.toList());
                                itemActiveSchemes = itemActiveSchemes.stream().filter(x -> !(x.getType().equals(SchemeType.SPECIAL_SUPPORT))).collect(Collectors.toList());
                                for (Scheme scheme : itemActiveSchemes) {
                                        LOGGER.info("Scheme ==== {}", scheme);
                                        itemCashback += this.createSchemeInOut(scheme, inventoryItem);
                                }
                                if (supportSchemes.size() > 0) {
                                        this.processSpecialSupport(fofoOrder, supportSchemes, inventoryItem, partnerType, fofoOrder.getCreateTimestamp());
                                }
                                LOGGER.info("itemCashback ==== {}", itemCashback);
                                if (itemCashback > 0) {
                                        count++;
                                        totalCashback += itemCashback;
                                }
                        }
                        if (count > 0) {
                                walletService.addAmountToWallet(
                                                retailerId, fofoOrderId, WalletReferenceType.SCHEME_OUT, "Sales margin for invoice number "
                                                                + fofoOrder.getInvoiceNumber() + ". Total " + count + " pc(s)",
                                                totalCashback, fofoOrder.getCreateTimestamp());
                                fofoOrder.setCashback(totalCashback + fofoOrder.getCashback());
                        }
                }
                return totalCashback;
        }

        @Override
        //Tax rate has been passed to 0 to ensure no tax deduction
        public float getSpecialSupportAmount(float supportAmount, PartnerType partnerType, LocalDate onDate,
                                                                                 int catalogId) throws ProfitMandiBusinessException {
                //int itemId = itemRepository.selectAllByCatalogItemId(catalogId).stream().findAny().get().getId();
                //float totalTaxRate = stateGstRateRepository.getTotalTaxRate(itemId);
                return this.getSpecialSupportAmount(supportAmount, partnerType, onDate, catalogId, 0);
        }

        @Override
        public float getSpecialSupportAmount(float supportAmount, PartnerType partnerType, LocalDate onDate,
                                                                                 int catalogId, float taxRate) throws ProfitMandiBusinessException {
                float totalMargin = this.selectPercentageScheme(partnerType, onDate, catalogId, false, 0, 0).stream().collect(Collectors.summingDouble(x -> x.getAmount())).floatValue();
                float amountToCredit = supportAmount * (1 - (totalMargin / (100 + taxRate)));
                return amountToCredit;
        }

        private void processSpecialSupport(FofoOrder fofoOrder, List<Scheme> supportSchemes, InventoryItem
                        inventoryItem, PartnerType partnerType, LocalDateTime saleDate) throws ProfitMandiBusinessException {
                int catalogId = inventoryItem.getItem().getCatalogItemId();
                float totalMargin = this.selectPercentageScheme(partnerType, saleDate.toLocalDate(), catalogId, false, 0, 0).stream().collect(Collectors.summingDouble(x -> x.getAmount())).floatValue();
                LOGGER.info("total percentage margin - {}", totalMargin);
                for (Scheme scheme : supportSchemes) {
                        float amountToCredit = scheme.getAmount() * (1 - (totalMargin / 100));
                        List<SchemeInOut> schemeInOuts = schemeInOutRepository.selectByScheme(scheme.getId(), inventoryItem.getId());
                        SchemeInOut schemeInOut = schemeInOuts.stream().filter(x -> x.getRolledBackTimestamp() == null).findFirst().orElse(null);
                        if (schemeInOut == null) {
                                schemeInOut = new SchemeInOut();
                                schemeInOut.setSchemeId(scheme.getId());
                                schemeInOut.setInventoryItemId(inventoryItem.getId());
                                schemeInOut.setCreateTimestamp(LocalDateTime.now());
                                schemeInOut.setAmount(amountToCredit);
                                schemeInOut.setStatus(SchemePayoutStatus.PENDING);
                                schemeInOut.setStatusDescription("Special support, Activation pending for IMEI#" + inventoryItem.getSerialNumber());
                                schemeInOutRepository.persist(schemeInOut);
                        } else if (Double.valueOf(schemeInOut.getAmount()).intValue() != Double.valueOf(amountToCredit).intValue()) {
                                SchemeInOut schemeInOutNew = new SchemeInOut();
                                schemeInOutNew.setInventoryItemId(inventoryItem.getId());
                                schemeInOutNew.setSchemeId(scheme.getId());
                                schemeInOutNew.setCreateTimestamp(LocalDateTime.now());
                                schemeInOutNew.setAmount(amountToCredit);
                                if (schemeInOut.getStatus().equals(SchemePayoutStatus.PENDING)) {
                                        schemeInOutNew.setStatus(SchemePayoutStatus.PENDING);
                                        schemeInOutNew.setStatusDescription("Special support, Activation pending for IMEI#" + inventoryItem.getSerialNumber());
                                        schemeInOutRepository.persist(schemeInOutNew);
                                } else if (schemeInOut.getStatus().equals(SchemePayoutStatus.CREDITED)) {
                                        schemeInOutNew.setStatus(SchemePayoutStatus.CREDITED);
                                        schemeInOutNew.setCreditTimestamp(LocalDateTime.now());
                                        schemeInOutNew.setStatusDescription("Special support credited");
                                        schemeInOutRepository.persist(schemeInOutNew);
                                        walletService.addAmountToWallet(inventoryItem.getFofoId(), fofoOrder.getId(), WalletReferenceType.SPECIAL_SUPPORT,
                                                        "Special support adjusted against overall margin gains for Imei - " + inventoryItem.getSerialNumber(), amountToCredit - schemeInOut.getAmount(),
                                                        fofoOrder.getCreateTimestamp());

                                }
                                schemeInOut.setStatus(SchemePayoutStatus.REJECTED);
                                schemeInOut.setStatusDescription("Failed!!, New Margin Entry added");
                                schemeInOut.setRolledBackTimestamp(LocalDateTime.now());
                        }

                }

        }

        @Override
        public void rollbackSchemes(List<Integer> inventoryItemIds, int rollbackReference, String rollbackReason)
                        throws Exception {
                Set<Integer> inventoryItemIdSet = new HashSet<>(inventoryItemIds);
                float amountToRollback = 0;
                List<SchemeInOut> schemes = schemeInOutRepository.selectByInventoryItemIds(inventoryItemIdSet);
                for (SchemeInOut schemeInOut : schemes) {
                        if (schemeInOut.getRolledBackTimestamp() == null) {
                                schemeInOut.setRolledBackTimestamp(LocalDateTime.now());
                                if (schemeInOut.getStatus() == null || schemeInOut.getStatus().equals(SchemePayoutStatus.CREDITED)) {
                                        amountToRollback += schemeInOut.getAmount();
                                }
                                schemeInOut.setStatus(SchemePayoutStatus.REJECTED);
                                schemeInOut.setStatusDescription(rollbackReason);
                        }
                }
                if (amountToRollback > 0) {
                        int inventoryItemId = inventoryItemIds.get(0);
                        InventoryItem ii = inventoryItemRepository.selectById(inventoryItemId);
                        Integer fofoId = ii.getFofoId();
                        // Purchase p = purchaseRepository.selectById(ii.getPurchaseId());
                        // TODO//
                        walletService.rollbackAmountFromWallet(fofoId, amountToRollback, ii.getPurchaseId(),
                                        WalletReferenceType.SCHEME_IN, rollbackReason, LocalDateTime.now());
                }
        }

        @Override
        public Map<String, Object> getSchemes(Set<Integer> roleIds, int offset, int limit)
                        throws ProfitMandiBusinessException {
                Map<String, Object> map = new HashMap<>();
                List<Scheme> schemes = null;
                long size = 0;
                if (roleManager.isAdmin(roleIds)) {
                        schemes = schemeRepository.selectAll(offset, limit);
                        size = schemeRepository.selectAllCount();
                } else {
                        schemes = schemeRepository.selectActiveAll(offset, limit);
                        size = schemeRepository.selectAllActiveCount();
                }
                map.put("schemes", schemes);
                map.put("start", offset + 1);
                map.put("size", size);
                if (schemes.size() < limit) {
                        map.put("end", offset + schemes.size());
                } else {
                        map.put("end", offset + limit);
                }
                return map;
        }

        @Override
        public List<Scheme> getPaginatedSchemes(Set<Integer> roleIds, int offset, int limit)
                        throws ProfitMandiBusinessException {
                LOGGER.info("requested offset=[{}], limit = [{}]", offset, limit);
                List<Scheme> schemes = null;
                if (roleManager.isAdmin(roleIds)) {
                        schemes = schemeRepository.selectAll(offset, limit);
                } else {
                        schemes = schemeRepository.selectActiveAll(offset, limit);
                }
                return schemes;
        }

        @Override
        // This is being called to reverse schemes while processing price Drop
        public void reverseSchemes(List<InventoryItem> inventoryItems, int priceDropId, String reversalReason)
                        throws ProfitMandiBusinessException {
                PriceDrop priceDrop = priceDropRepository.selectById(priceDropId);
                Map<Integer, List<InventoryItem>> purchaseInventoryListMap = inventoryItems.stream()
                                .collect(Collectors.groupingBy(InventoryItem::getPurchaseId, Collectors.toList()));

                for (Map.Entry<Integer, List<InventoryItem>> purchaseEntry : purchaseInventoryListMap.entrySet()) {
                        float amountToCredit = 0;
                        float amountToDebit = 0;
                        int purchaseId = purchaseEntry.getKey();
                        List<InventoryItem> purchaseInventoryItemList = purchaseEntry.getValue();

                        Map<Integer, InventoryItem> inventoryItemsMap = purchaseInventoryItemList.stream()
                                        .collect(Collectors.toMap(x -> x.getId(), x -> x));

                        List<SchemeInOut> schemeInOuts = schemeInOutRepository.selectByInventoryItemIds(inventoryItemsMap.keySet());
                        LOGGER.info("Scheme InOuts , {}", schemeInOuts);
                        if (schemeInOuts.size() == 0) {
                                continue;
                        }
                        List<Integer> schemeIds = schemeInOuts.stream().map(x -> x.getSchemeId()).collect(Collectors.toList());
                        Map<Integer, Scheme> schemesMap = schemeRepository.selectBySchemeIds(schemeIds, 0, schemeIds.size())
                                        .stream().collect(Collectors.toMap(x -> x.getId(), x -> x));
                        for (SchemeInOut schemeInOut : schemeInOuts) {
                                InventoryItem ii = inventoryItemsMap.get(schemeInOut.getInventoryItemId());
                                Scheme scheme = schemesMap.get(schemeInOut.getSchemeId());
                                if (scheme.getAmountType().equals(AmountType.FIXED)) {
                                        continue;
                                }
                                if (scheme.getType().equals(SchemeType.IN) && schemeInOut.getRolledBackTimestamp() == null) {
                                        float newAmount = getAmount(ii, scheme);
                                        if (Math.abs(schemeInOut.getAmount() - newAmount) >= 0.01f) {
                                                schemeInOut.setRolledBackTimestamp(LocalDateTime.now());

                                                SchemeInOut sioNew = new SchemeInOut();
                                                sioNew.setAmount(newAmount);
                                                sioNew.setStatus(schemeInOut.getStatus());
                                                sioNew.setStatusDescription(schemeInOut.getStatusDescription());
                                                sioNew.setInventoryItemId(schemeInOut.getInventoryItemId());
                                                sioNew.setSchemeId(schemeInOut.getSchemeId());
                                                sioNew.setCreditTimestamp(LocalDateTime.now());
                                                schemeInOutRepository.persist(sioNew);

                                                schemeInOut.setStatus(SchemePayoutStatus.REJECTED);
                                                schemeInOut.setStatusDescription("Change in margins due to price drop");
                                                // IF not credited then dont consider any credit/debit for that sio entry
                                                if (schemeInOut.getCreditTimestamp() != null) {
                                                        amountToCredit += sioNew.getAmount();
                                                        amountToDebit += schemeInOut.getAmount();
                                                }
                                        }

                                }
                        }
                        int fofoId = inventoryItems.get(0).getFofoId();
                        if (amountToDebit > 0) {
                                walletService.addAmountToWallet(fofoId, purchaseId, WalletReferenceType.SCHEME_IN,
                                                MessageFormat.format(reversalReason, purchaseInventoryItemList.size()), -amountToDebit,
                                                priceDrop.getAffectedOn());
                        }
                        if (amountToCredit > 0) {
                                walletService.addAmountToWallet(fofoId, purchaseId, WalletReferenceType.SCHEME_IN,
                                                MessageFormat.format(reversalReason, purchaseInventoryItemList.size()), amountToCredit,
                                                priceDrop.getAffectedOn());
                        }
                }
        }

        @Override
        // Always being called from cancel order/bad return means no SCHEME IN is considered
        public void reverseSchemes(List<InventoryItem> inventoryItems, int reversalReference, String reversalReason,
                                                           List<SchemeType> schemeTypes) throws ProfitMandiBusinessException {
                Map<Integer, InventoryItem> inventoryItemsMap = inventoryItems.stream()
                                .collect(Collectors.toMap(x -> x.getId(), x -> x));
                LOGGER.info("inventoryItems" + inventoryItems);

                Map<SchemeType, SchemeInOut> schemeTypeMap = new HashMap<>();

                List<SchemeInOut> schemeInOuts = schemeInOutRepository.selectByInventoryItemIds(inventoryItemsMap.keySet());
                LOGGER.info("schemeInOuts" + schemeInOuts);
                float amountToRollback = 0;

                if (!schemeInOuts.isEmpty()) {
                        List<Integer> schemeIds = schemeInOuts.stream().map(x -> x.getSchemeId()).collect(Collectors.toList());
                        LOGGER.info("schemeIds" + schemeIds);

                        Map<Integer, Scheme> schemesMap = schemeRepository.selectBySchemeIds(schemeIds, 0, schemeIds.size())
                                        .stream().collect(Collectors.toMap(x -> x.getId(), x -> x));
                        for (SchemeInOut schemeInOut : schemeInOuts) {
                                Scheme scheme = schemesMap.get(schemeInOut.getSchemeId());
                                if (schemeTypes.contains(scheme.getType())) {
                                        schemeTypeMap.put(scheme.getType(), schemeInOut);
                                        if (schemeInOut.getRolledBackTimestamp() == null) {
                                                schemeInOut.setRolledBackTimestamp(LocalDateTime.now());
                                                if (schemeInOut.getStatus().equals(SchemePayoutStatus.CREDITED)) {
                                                        amountToRollback += schemeInOut.getAmount();
                                                }
                                                schemeInOut.setStatus(SchemePayoutStatus.REJECTED);
                                                schemeInOut.setStatusDescription(reversalReason);
                                        }
                                }
                        }

                }
                int fofoId = inventoryItems.get(0).getFofoId();
                WalletReferenceType walletReferenceType = schemeTypes.containsAll(SchemeService.OUT_SCHEME_TYPES)
                                ? WalletReferenceType.SCHEME_OUT
                                : (schemeTypes.contains(SchemeType.ACTIVATION) ? WalletReferenceType.ACTIVATION_SCHEME
                                : (schemeTypes.contains(SchemeType.SPECIAL_SUPPORT)) ? WalletReferenceType.SPECIAL_SUPPORT : WalletReferenceType.INVESTMENT_PAYOUT);
                if (amountToRollback > 0) {
                        // Mark appropriate reference of rollback investment margin
                        if (schemeTypes.contains(SchemeType.INVESTMENT)) {
                                reversalReference = Integer
                                                .parseInt(FormattingUtils.getYearMonth(schemeTypeMap.get(SchemeType.INVESTMENT).getCreditTimestamp().minusMonths(1)));
                        }
                        walletService.rollbackAmountFromWallet(fofoId, amountToRollback, reversalReference, walletReferenceType,
                                        reversalReason, LocalDateTime.now());
                }
        }

        @Override
        public double getTotalMargin(int itemId, PartnerType partnerType, LocalDateTime dateTime) {
                Session session = sessionFactory.getCurrentSession();
                CriteriaBuilder cb = session.getCriteriaBuilder();
                CriteriaQuery<Double> criteriaQuery = cb.createQuery(Double.class);
                Root<SchemeItem> schemeItem = criteriaQuery.from(SchemeItem.class);
                Root<Scheme> scheme = criteriaQuery.from(Scheme.class);
                Predicate schemePredicate = cb.equal(scheme.get(ProfitMandiConstants.AMOUNT_TYPE), AmountType.PERCENTAGE);
                Predicate lessThanPredicate = cb.lessThanOrEqualTo(scheme.get(ProfitMandiConstants.END_DATE_TIME), dateTime);
                Predicate greaterThanPredicate = cb.greaterThanOrEqualTo(scheme.get(ProfitMandiConstants.START_DATE_TIME),
                                dateTime);
                Predicate joinPredicate = cb.equal(scheme.get("id"), schemeItem.get("schemeId"));
                Predicate schemeItemPredicate = cb.equal(schemeItem.get(ProfitMandiConstants.ITEM_ID), itemId);
                criteriaQuery.select(cb.sum(scheme.get(ProfitMandiConstants.AMOUNT))).where(schemePredicate, lessThanPredicate,
                                greaterThanPredicate, schemeItemPredicate, joinPredicate);

                Query<Double> query = session.createQuery(criteriaQuery);
                return query.getSingleResult() + ProfitMandiConstants.SCHEME_INVESTMENT_MARGIN;

        }

        @Override
        @Cacheable(value = "itemSchemeCashback", cacheManager = "timeoutCacheManager")
        public Map<Integer, Float> getCatalogSchemeCashBack() {
                Map<Integer, Float> itemCashbackMap = new HashMap<>();
                Map<Integer, Scheme> cashbackSchemesMap = schemeRepository
                                .selectActiveAll(Arrays.asList(SchemeType.ACTIVATION, SchemeType.SPECIAL_SUPPORT), PartnerType.ALL, LocalDate.now(), false)
                                .stream().filter(x -> x.getAmountType().equals(AmountType.FIXED))
                                .collect(Collectors.toMap(x -> x.getId(), x -> x));
                if (cashbackSchemesMap.size() > 0) {
                        List<SchemeItem> schemeItems = schemeItemRepository.selectBySchemeIds(cashbackSchemesMap.keySet());
                        schemeItems.stream().forEach(x -> {
                                float cashbackAmount = cashbackSchemesMap.get(x.getSchemeId()).getAmount();
                                if (!itemCashbackMap.containsKey(x.getCatalogId())) {
                                        itemCashbackMap.put(x.getCatalogId(), cashbackAmount);
                                } else {
                                        itemCashbackMap.put(x.getCatalogId(), itemCashbackMap.get(x.getCatalogId()) + cashbackAmount);
                                }
                        });
                }
                // A107FD Model needs to removed
                itemCashbackMap.remove(30211);
                itemCashbackMap.remove(30212);
                itemCashbackMap.remove(30213);
                itemCashbackMap.remove(30756);
                return itemCashbackMap;
        }

        @Override
        public List<Scheme> selectSchemeByPartnerTypeFofoId(PartnerType partnerType, LocalDate onDate, int catalogId, int fofoId, int offset, int limit) throws ProfitMandiBusinessException {
                Session session = sessionFactory.getCurrentSession();
                final TypedQuery<Scheme> typedQuery = session.createNamedQuery(
                                "Scheme.selectSchemeByModelsPartnerTypeFofoId", Scheme.class);
                typedQuery.setParameter("catalogIds", Arrays.asList(catalogId));
                typedQuery.setParameter("fofoIds", Arrays.asList(fofoId, 0));
                typedQuery.setParameter("onDate", onDate.atStartOfDay());
                typedQuery.setParameter("partnerTypes", Arrays.asList(partnerType, partnerType.ALL));
                typedQuery.setFirstResult(offset);
                if (limit != 0) {
                        typedQuery.setMaxResults(limit);
                }
                return typedQuery.getResultList();
        }

        @Override
        public List<Scheme> selectSchemeByPartnerType(PartnerType partnerType, LocalDate onDate, int catalogId,
                                                                                                  boolean isAdmin, int offset, int limit) throws ProfitMandiBusinessException {
                Session session = sessionFactory.getCurrentSession();
                List<Predicate> andPredicates = new ArrayList<>();
                CriteriaBuilder cb = session.getCriteriaBuilder();
                CriteriaQuery<Scheme> query = cb.createQuery(Scheme.class);
                Root<Scheme> scheme = query.from(Scheme.class);
                if (!partnerType.equals(PartnerType.ALL)) {
                        List<PartnerType> pt = new ArrayList<>();
                        pt.add(PartnerType.ALL);
                        pt.add(partnerType);
                        andPredicates.add(cb.in(scheme.get("partnerType")).value(pt));
                }
                cb.desc(cb.isNull(scheme.get("expireTimestamp")));
                if (catalogId > 0) {

                        List<Integer> schemeIds = schemeItemRepository.selectSchemeIdByCatalogId(catalogId);
                        LOGGER.info("schemeId" + schemeIds);
                        if (schemeIds.isEmpty()) {
                                return new ArrayList<>();
                        }
                        andPredicates.add(cb.in(scheme.get("id")).value(schemeIds));
                        if (onDate != null) {
                                andPredicates.add(cb.greaterThan(scheme.get("endDateTime"), onDate.atStartOfDay()));
                                andPredicates.add(cb.lessThanOrEqualTo(scheme.get("startDateTime"), onDate.atStartOfDay()));
                        }
                }
                if (!isAdmin) {
                        andPredicates.add(cb.isNotNull(scheme.get("activeTimestamp")));
                }
                query.where(cb.and(andPredicates.toArray(new Predicate[0])));
                query.orderBy(cb.desc(cb.function("isnull", Boolean.class, scheme.get("expireTimestamp"))));
                if (limit == 0) {
                        return session.createQuery(query).setFirstResult(offset).getResultList();
                }
                return session.createQuery(query).setFirstResult(offset).setMaxResults(limit).getResultList();

        }

        @Override
        public List<Scheme> selectPercentageScheme(PartnerType partnerType, LocalDate onDate, int catalogId,
                                                                                           boolean isAdmin, int offset, int limit) throws ProfitMandiBusinessException {
                List<Scheme> schemes = this.selectSchemeByPartnerType(partnerType, onDate, catalogId, isAdmin, offset, limit);
                return schemes.stream().filter(x -> x.getAmountType().equals(AmountType.PERCENTAGE)).collect(Collectors.toList());
        }

        @Override
        public void processActivation() throws ProfitMandiBusinessException {
                List<SchemeInOut> pendingPayouts = schemeInOutRepository.selectAllPending();
                List<Integer> schemeIds = new ArrayList<>();
                Set<Integer> inventoryIds = new HashSet<>();
                for (SchemeInOut pendingPayout : pendingPayouts) {
                        schemeIds.add(pendingPayout.getSchemeId());
                }
                Map<Integer, Scheme> schemesMap = schemeRepository.selectBySchemeIds(schemeIds, 0, 0).stream()
                                .filter(x -> x.getType().equals(SchemeType.ACTIVATION) || x.getType().equals(SchemeType.SPECIAL_SUPPORT))
                                .collect(Collectors.toMap(x -> x.getId(), x -> x));
                pendingPayouts = pendingPayouts.stream().filter(x -> schemesMap.get(x.getSchemeId()) != null)
                                .collect(Collectors.toList());

                for (SchemeInOut pendingPayout : pendingPayouts) {
                        inventoryIds.add(pendingPayout.getInventoryItemId());
                }
                Map<Integer, InventoryItem> inventoryItemMap = inventoryItemRepository.selectByIds(inventoryIds).stream()
                                .collect(Collectors.toMap(x -> x.getId(), x -> x));
                Map<String, InventoryItem> serialNumberMap = inventoryItemMap.values().stream()
                                .collect(Collectors.toMap(x -> x.getSerialNumber(), x -> x));

                List<ActivatedImei> activatedImeis = activatedImeiRepository
                                .selectBySerialNumbers(new ArrayList<>(serialNumberMap.keySet())).stream().collect(Collectors.toList());

                Map<String, ActivatedImei> activatedImeiMap = activatedImeis.stream()
                                .collect(Collectors.toMap(x -> x.getSerialNumber().toLowerCase(), x -> x));
                for (SchemeInOut pendingPayout : pendingPayouts) {
                        InventoryItem ii = inventoryItemMap.get(pendingPayout.getInventoryItemId());
                        String serialNumber = ii.getSerialNumber().toLowerCase();
                        ActivatedImei activatedImei = activatedImeiMap.get(serialNumber);
                        if (activatedImei == null) {
                                continue;
                        }
                        Scheme scheme = schemesMap.get(pendingPayout.getSchemeId());
                        if (scheme.isWithinRange(activatedImei.getActivationTimestamp())) {
                                int fofoId = ii.getFofoId();
                                // Get latest order Id
                                int orderId = scanRecordRepository.selectByInventoryItemId(ii.getId()).stream()
                                                .filter(x -> x.getOrderId() > 0)
                                                .sorted(Comparator.comparing(ScanRecord::getCreateTimestamp).reversed()).findFirst().get()
                                                .getOrderId();
                                FofoOrder fofoOrder = fofoOrderRepository.selectByOrderId(orderId);
                                if (scheme.getType().equals(SchemeType.ACTIVATION)) {
                                        walletService.addAmountToWallet(fofoId, orderId, WalletReferenceType.ACTIVATION_SCHEME,
                                                        "Activation margin for " + ii.getItem().getItemDescriptionNoColor() + ", Imei - " + serialNumber, pendingPayout.getAmount(),
                                                        fofoOrder.getCreateTimestamp());
                                        pendingPayout.setStatusDescription("Activation margin credited, activated on " + FormattingUtils.formatDate(activatedImei.getActivationTimestamp()));
                                } else {
                                        walletService.addAmountToWallet(fofoId, orderId, WalletReferenceType.SPECIAL_SUPPORT,
                                                        "Special Support for " + ii.getItem().getItemDescriptionNoColor() + ", Imei - " + serialNumber, pendingPayout.getAmount(),
                                                        fofoOrder.getCreateTimestamp());
                                        pendingPayout.setStatusDescription("Special support credited, activated on " + FormattingUtils.formatDate(activatedImei.getActivationTimestamp()));
                                }
                                pendingPayout.setCreditTimestamp(LocalDateTime.now());
                                pendingPayout.setStatus(SchemePayoutStatus.CREDITED);
                        } else {
                                pendingPayout.setStatus(SchemePayoutStatus.REJECTED);
                                pendingPayout.setRolledBackTimestamp(LocalDateTime.now());
                                ;
                                pendingPayout.setStatusDescription(
                                                "Rejected, activated on " + FormattingUtils.formatDate(activatedImei.getActivationTimestamp()));
                        }
                }
        }

        @Override
        public void processActivatedImeisForSchemes() throws ProfitMandiBusinessException {
                List<SchemesImeisModel> schemesImeisModels = schemeRepository.selectSelectUnpaidSchemes();
                LOGGER.info("Total Size - " + schemesImeisModels.size());
                List<Integer> orderIds = schemesImeisModels.stream().map(x -> x.getOrderId()).collect(Collectors.toList());
                List<FofoOrder> fofoOrders = fofoOrderRepository.selectAllByOrderIds(orderIds);
                Map<Integer, FofoOrder> validOrdersMap = fofoOrders.stream().filter(x -> x.getCancelledTimestamp() == null).collect(Collectors.toMap(x -> x.getId(), x -> x));
                Map<String, List<SchemesImeisModel>> validImeiSchemesModelMap = schemesImeisModels.stream().filter(x -> validOrdersMap.containsKey(x.getOrderId())).collect(Collectors.groupingBy(x -> x.getImei()));
                for (Map.Entry<String, List<SchemesImeisModel>> imeiListEntry : validImeiSchemesModelMap.entrySet()) {
                        SchemesImeisModel schemesImeisModel = imeiListEntry.getValue().get(0);
                        List<Integer> schemeIds = imeiListEntry.getValue().stream().map(x -> x.getSchemeId()).collect(Collectors.toList());
                        LOGGER.info("Serial Number  - {}, Scheme IDs - {}", schemesImeisModel.getImei(), schemeIds);
                        InventoryItem inventoryItem = inventoryItemRepository.selectById(schemesImeisModel.getInventoryItemId());
                        List<Scheme> schemes = schemeRepository.selectBySchemeIds(schemeIds);
                        List<Scheme> supportSchemes = schemes.stream().filter(x -> Arrays.asList(SchemeType.SPECIAL_SUPPORT, SchemeType.ACTIVATION).contains(x.getType())).collect(Collectors.toList());
                        if (supportSchemes.size() > 0) {
                                FofoOrder fofoOrder = validOrdersMap.get(schemesImeisModel.getOrderId());
                                PartnerType partnerType = partnerTypeChangeService.getTypeOnMonth(fofoOrder.getFofoId(),
                                                YearMonth.from(fofoOrder.getCreateTimestamp()));
                                this.processSpecialSupport(fofoOrder, supportSchemes, inventoryItem, partnerType, fofoOrder.getCreateTimestamp());
                        }

                }


        }

}