Subversion Repositories SmartDukaan

Rev

Rev 36070 | View as "text/plain" | Blame | Compare with Previous | Last modification | View Log | RSS feed

package com.spice.profitmandi.service.scheme;

import com.spice.profitmandi.common.enumuration.ItemType;
import com.spice.profitmandi.common.enumuration.MessageType;
import com.spice.profitmandi.common.exception.ProfitMandiBusinessException;
import com.spice.profitmandi.common.model.ProfitMandiConstants;
import com.spice.profitmandi.common.model.SchemeModel;
import com.spice.profitmandi.common.model.SendNotificationModel;
import com.spice.profitmandi.common.services.ReporticoService;
import com.spice.profitmandi.common.util.FormattingUtils;
import com.spice.profitmandi.common.util.StringUtils;
import com.spice.profitmandi.common.util.Utils;
import com.spice.profitmandi.dao.entity.catalog.*;
import com.spice.profitmandi.dao.entity.fofo.*;
import com.spice.profitmandi.dao.entity.inventory.PartnerAgeingModel;
import com.spice.profitmandi.dao.entity.transaction.Loan;
import com.spice.profitmandi.dao.entity.transaction.PriceDrop;
import com.spice.profitmandi.dao.entity.transaction.UserWallet;
import com.spice.profitmandi.dao.entity.transaction.UserWalletHistory;
import com.spice.profitmandi.dao.enumuration.catalog.AmountType;
import com.spice.profitmandi.dao.enumuration.catalog.SchemeType;
import com.spice.profitmandi.dao.enumuration.catalog.StockTransactionType;
import com.spice.profitmandi.dao.enumuration.fofo.ScanType;
import com.spice.profitmandi.dao.enumuration.transaction.SchemePayoutStatus;
import com.spice.profitmandi.dao.model.AmountModel;
import com.spice.profitmandi.dao.model.CreateSchemeRequest;
import com.spice.profitmandi.dao.repository.catalog.*;
import com.spice.profitmandi.dao.repository.dtr.FofoStoreRepository;
import com.spice.profitmandi.dao.repository.dtr.RetailerRepository;
import com.spice.profitmandi.dao.repository.fofo.*;
import com.spice.profitmandi.dao.repository.transaction.LoanRepository;
import com.spice.profitmandi.dao.repository.transaction.PriceDropRepository;
import com.spice.profitmandi.dao.repository.transaction.UserWalletHistoryRepository;
import com.spice.profitmandi.dao.repository.transaction.UserWalletRepository;
import com.spice.profitmandi.dao.repository.warehouse.AgeingSummaryModel;
import com.spice.profitmandi.dao.repository.warehouse.WarehouseInventoryItemRepository;
import com.spice.profitmandi.service.NotificationService;
import com.spice.profitmandi.service.authentication.RoleManager;
import com.spice.profitmandi.service.inventory.AgeingService;
import com.spice.profitmandi.service.inventory.PurchaseService;
import com.spice.profitmandi.service.pricecircular.CatalogSummaryModel;
import com.spice.profitmandi.service.pricecircular.PriceCircularService;
import com.spice.profitmandi.service.pricecircular.SchemeSummaryModel;
import com.spice.profitmandi.service.wallet.WalletService;
import in.shop2020.model.v1.order.WalletReferenceType;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.hibernate.Session;
import org.hibernate.SessionFactory;
import org.hibernate.query.Query;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.stereotype.Component;

import javax.persistence.TypedQuery;
import javax.persistence.criteria.CriteriaBuilder;
import javax.persistence.criteria.CriteriaQuery;
import javax.persistence.criteria.Predicate;
import javax.persistence.criteria.Root;
import java.text.MessageFormat;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.YearMonth;
import java.util.*;
import java.util.stream.Collectors;

@Component
public class SchemeServiceImpl implements SchemeService {

    private static final Logger LOGGER = LogManager.getLogger(SchemeServiceImpl.class);
    private static final Set<Integer> tagIds = new HashSet<>(Arrays.asList(4));
    private static final List<SchemeType> FULL_DAYS_CD_SCHEME_TYPES = Arrays.asList(SchemeType.CASH_DISCOUNT, SchemeType.CASH_DISCOUNT1);
    private static final List<SchemeType> HALF_DAYS_CD_REJECT_SCHEME_TYPES = Arrays.asList(SchemeType.CASH_DISCOUNT1);


    private static final List<SchemeType> ACTIVATION_SCHEME_TYPES = Arrays.asList(SchemeType.SPECIAL_SUPPORT, SchemeType.SELLOUT);
    private static final List<SchemeType> NOFITY_SCHEME_TYPES = Arrays.asList(SchemeType.SPECIAL_SUPPORT, SchemeType.SELLOUT, SchemeType.SELLIN);

    @Autowired
    SchemeBlockedImeiRepository schemeBlockedImeiRepository;
    @Autowired
    StateGstRateRepository stateGstRateRepository;
    @Autowired
    NotificationService notificationService;
    @Autowired
    @Qualifier("fofoInventoryItemRepository")
    private InventoryItemRepository inventoryItemRepository;
    @Autowired
    private ActivatedImeiRepository activatedImeiRepository;
    @Autowired
    private PartnerTypeChangeService partnerTypeChangeService;
    @Autowired
    private PurchaseService purchaseService;
    @Autowired
    private ScanRecordRepository scanRecordRepository;
    @Autowired
    private SessionFactory sessionFactory;
    @Autowired
    private SchemeRepository schemeRepository;
    @Autowired
    private PriceDropRepository priceDropRepository;
    @Autowired
    private RoleManager roleManager;
    @Autowired
    private RetailerRepository retailerRepository;
    @Autowired
    private ReporticoService reporticoService;
    @Autowired
    private TagListingRepository tagListingRepository;
    @Autowired
    private SchemeInOutRepository schemeInOutRepository;
    @Autowired
    @Qualifier("catalogItemRepository")
    private ItemRepository itemRepository;
    @Autowired
    private SchemeItemRepository schemeItemRepository;
    @Autowired
    private SchemeRegionRepository schemeRegionRepository;
    @Autowired
    private WalletService walletService;
    @Autowired
    private PurchaseRepository purchaseRepository;
    @Autowired
    private FofoOrderRepository fofoOrderRepository;
    @Autowired
    private PriceCircularService priceCircularService;
    @Autowired
    private SamsungExceptionRepository samsungExceptionRepository;

    @Override
    public List<String> getBlockedImeis() {
        List<SchemeBlockedImei> schemeBlockedImeis = schemeBlockedImeiRepository.selectAll();
        return schemeBlockedImeis.stream().filter(x -> x.isBlocked()).map(x -> x.getImei()).collect(Collectors.toList());
    }

    @Override
    public void saveScheme(int creatorId, CreateSchemeRequest createSchemeRequest) throws ProfitMandiBusinessException {

        this.validateCreateSchemeRequest(createSchemeRequest);

        Scheme scheme = this.toScheme(creatorId, createSchemeRequest);

        if (scheme.getStartDateTime().isAfter(scheme.getEndDateTime())) {
            throw new ProfitMandiBusinessException(
                    ProfitMandiConstants.START_DATE + ", " + ProfitMandiConstants.END_DATE,
                    scheme.getStartDateTime() + ", " + scheme.getEndDateTime(), "SCHM_VE_1005");
        }

        // this.validateItemIds(createSchemeRequest);
        schemeRepository.persist(scheme);
        for (int catalogId : createSchemeRequest.getCatalogIds()) {
            SchemeItem schemeItem = new SchemeItem();
            schemeItem.setSchemeId(scheme.getId());
            schemeItem.setCatalogId(catalogId);
            schemeItem.setStartDate(createSchemeRequest.getStartDate());
            schemeItem.setEndDate(createSchemeRequest.getEndDate());
            schemeItemRepository.persist(schemeItem);
        }

        for (int regionId : createSchemeRequest.getRegionIds()) {
            SchemeRegion schemeRegion = new SchemeRegion();
            schemeRegion.setSchemeId(scheme.getId());
            schemeRegion.setRegionId(regionId);
            schemeRegionRepository.persist(schemeRegion);
        }

    }

    private void validateCreateSchemeRequest(CreateSchemeRequest createSchemeRequest)
            throws ProfitMandiBusinessException {
        if (createSchemeRequest.getName() == null || createSchemeRequest.getName().isEmpty()) {
            throw new ProfitMandiBusinessException(ProfitMandiConstants.NAME, createSchemeRequest.getName(),
                    "SCHM_VE_1000");
        }
        if (createSchemeRequest.getAmount() <= 0) {
            throw new ProfitMandiBusinessException(ProfitMandiConstants.AMOUNT, createSchemeRequest.getAmount(),
                    "SCHM_VE_1001");
        }

        if (createSchemeRequest.getAmountType().equals(AmountType.PERCENTAGE)
                && createSchemeRequest.getAmount() > 100) {
            throw new ProfitMandiBusinessException(ProfitMandiConstants.AMOUNT, createSchemeRequest.getAmount(),
                    "SCHM_VE_1002");
        }

        if (createSchemeRequest.getStartDate() == null) {
            throw new ProfitMandiBusinessException(ProfitMandiConstants.START_DATE, createSchemeRequest.getStartDate(),
                    "SCHM_VE_1003");
        }
        if (createSchemeRequest.getEndDate() == null) {
            throw new ProfitMandiBusinessException(ProfitMandiConstants.END_DATE, createSchemeRequest.getEndDate(),
                    "SCHM_VE_1004");
        }
    }

    private void validateItemIds(CreateSchemeRequest createSchemeRequest) throws ProfitMandiBusinessException {
        if (createSchemeRequest.getCatalogIds() == null || createSchemeRequest.getCatalogIds().isEmpty()) {
            throw new ProfitMandiBusinessException(ProfitMandiConstants.ITEM_ID, createSchemeRequest.getCatalogIds(),
                    "SCHM_1003");
        }
        List<Integer> foundItemIds = itemRepository.selectIdsByIdsAndType(createSchemeRequest.getCatalogIds(),
                ItemType.SERIALIZED);
        if (foundItemIds.size() != createSchemeRequest.getCatalogIds().size()) {
            createSchemeRequest.getCatalogIds().removeAll(foundItemIds);
            throw new ProfitMandiBusinessException(ProfitMandiConstants.ITEM_ID, createSchemeRequest.getCatalogIds(),
                    "SCHM_1004");
        }
    }

    @Override
    public Scheme getSchemeById(int schemeId) throws ProfitMandiBusinessException {
        Scheme scheme = schemeRepository.selectById(schemeId);
        List<Integer> catalogIds = schemeItemRepository.selectCatalogIdsBySchemeId(scheme.getId());
        if (catalogIds.size() > 0) {
            List<Item> items = itemRepository.selectAllByCatalogIds(new HashSet<>(catalogIds));
            scheme.setCatalogStringMap(this.toCatalogStringMap(items));
        }
        return scheme;
    }

    public Map<Integer, String> toCatalogStringMap(List<Item> items) {
        Map<Integer, String> catalogMap = new HashMap<>();
        for (Item item : items) {
            if (!catalogMap.containsKey(item.getCatalogItemId())) {
                catalogMap.put(item.getCatalogItemId(), item.getItemDescriptionNoColor());
            }
        }
        return catalogMap;
    }

    private Set<Integer> schemeItemsToCatalogIds(List<SchemeItem> schemeItems) {
        Set<Integer> catalogId = new HashSet<>();
        for (SchemeItem schemeItem : schemeItems) {
            catalogId.add(schemeItem.getCatalogId());
        }
        return catalogId;
    }

    @Override
    public List<SchemeModel> getAllSchemeModels(LocalDateTime startDateTime, LocalDateTime endDateTime) throws ProfitMandiBusinessException {
        List<Scheme> schemes = schemeRepository.selectAllBetweenCreateTimestamp(startDateTime, endDateTime);
        Map<Integer, Scheme> schemeIdSchemeMap = schemes.stream().collect(Collectors.toMap(x -> x.getId(), x -> x));
        List<SchemeItem> schemeItems = schemeItemRepository.selectBySchemeIds(schemeIdSchemeMap.keySet());
        Set<Integer> catalogIds = schemeItems.stream().map(x -> x.getCatalogId()).collect(Collectors.toSet());
        List<Item> items = itemRepository.selectAllByCatalogIds(catalogIds);
        Map<Integer, String> catalogStringMap = this.toCatalogStringMap(items);
        this.addCatalogIdsToSchemes(schemeItems, schemeIdSchemeMap, catalogStringMap);
        return this.toSchemeModels(schemeIdSchemeMap);
    }

    private List<SchemeModel> toSchemeModels(Map<Integer, Scheme> schemeIdSchemeMap) {
        List<SchemeModel> schemeModels = new ArrayList<>();
        for (Map.Entry<Integer, Scheme> schemeIdSchemeEntry : schemeIdSchemeMap.entrySet()) {
            schemeModels.add(this.toSchemeModel(schemeIdSchemeEntry.getValue()));
        }
        return schemeModels;
    }

    private SchemeModel toSchemeModel(Scheme scheme) {
        SchemeModel schemeModel = new SchemeModel();
        schemeModel.setSchemeId(scheme.getId());
        schemeModel.setName(scheme.getName());
        schemeModel.setDescription(scheme.getDescription());
        schemeModel.setSchemeType(scheme.getType().toString());
        schemeModel.setAmountType(scheme.getAmountType().toString());
        schemeModel.setAmount(scheme.getAmount());
        schemeModel.setStartDateTime(StringUtils.toString(scheme.getStartDateTime()));
        schemeModel.setEndDateTime(StringUtils.toString(scheme.getEndDateTime()));
        schemeModel.setCreateTimestamp(StringUtils.toString(scheme.getCreateTimestamp()));
        schemeModel.setActiveTimestamp(StringUtils.toString(scheme.getActiveTimestamp()));
        schemeModel.setExpireTimestamp(StringUtils.toString(scheme.getExpireTimestamp()));
        schemeModel.setCreatedBy(scheme.getCreatedBy());
        schemeModel.setCatalogStringMap(scheme.getCatalogStringMap());
        schemeModel.setRetailerIds(scheme.getRetailerIds());
        return schemeModel;
    }

    private void addCatalogIdsToSchemes(List<SchemeItem> schemeItems, Map<Integer, Scheme> schemeIdSchemeMap,
                                        Map<Integer, String> catalogStringMap) {
        for (SchemeItem schemeItem : schemeItems) {
            Scheme scheme = schemeIdSchemeMap.get(schemeItem.getSchemeId());
            scheme.getCatalogStringMap().put(schemeItem.getCatalogId(), catalogStringMap.get(schemeItem.getCatalogId()));
        }
    }

    @Override
    public void activeSchemeById(int schemeId) throws ProfitMandiBusinessException {
        Scheme scheme = schemeRepository.selectById(schemeId);
        if (scheme.getActiveTimestamp() != null) {
            throw new ProfitMandiBusinessException(ProfitMandiConstants.ACTIVE_TIMESTAMP, scheme.getActiveTimestamp(),
                    "SCHM_1005");
        }
        if (scheme.getExpireTimestamp() != null) {
            throw new ProfitMandiBusinessException(ProfitMandiConstants.EXPIRE_TIMESTAMP, scheme.getExpireTimestamp(),
                    "SCHM_1006");
        }
        scheme.setActiveTimestamp(LocalDateTime.now());
        this.sendSchemeNotification(scheme);


        /*
         * if (scheme.getType() == SchemeType.IN) {
         * this.processPreviousPurchases(scheme); } else if (scheme.getType() ==
         * SchemeType.OUT) { this.processPreviousSales(scheme); }
         */
    }

    @Override
    public void activeSchemeByIds(List<Scheme> schemes) throws ProfitMandiBusinessException {
        if (schemes.size() > 0) {
            for (Scheme scheme : schemes) {
                if (scheme.getActiveTimestamp() != null || scheme.getExpireTimestamp() != null) {
                    continue;
                }
                scheme.setActiveTimestamp(LocalDateTime.now());
            }
            this.sendCombinedSchemesNotification(schemes);
        }
    }

    @Override
    public void expireSchemeByIds(List<Scheme> schemes) throws ProfitMandiBusinessException {
        if (schemes.size() > 0) {
            for (Scheme scheme : schemes) {
                if (scheme.getExpireTimestamp() != null) {
                    continue;
                }
                scheme.setExpireTimestamp(scheme.getEndDateTime());
            }
            //this.sendCombinedSchemesNotification(schemes);
        }
    }

    private void sendSchemeNotification(Scheme scheme) throws ProfitMandiBusinessException {
        if (NOFITY_SCHEME_TYPES.contains(scheme.getType())) {

            SendNotificationModel sendNotificationModel = new SendNotificationModel();
            List<SchemeItem> schemeItems = schemeItemRepository.selectBySchemeIds(Collections.singleton(scheme.getId()));
            Set<Integer> catalogIds = schemeItems.stream().map(x -> x.getCatalogId()).collect(Collectors.toSet());
            List<String> itemDescriptions = itemRepository.selectAllByCatalogIds(catalogIds).stream().filter(Utils.distinctByKey(Item::getCatalogItemId))
                    .map(x -> x.getItemDescriptionNoColor()).collect(Collectors.toList());
            String titleTemplate = "Important Notice!\n\n %s of Rs.%s for %s";
            String schemeString = "Activation scheme";
            if (scheme.getType().equals(SchemeType.SPECIAL_SUPPORT)) {
                schemeString = "Special Support";
            }

            String message = "Duration from - " + FormattingUtils.formatDateMonth(scheme.getStartDateTime()) + " - " + FormattingUtils.formatDateMonth(scheme.getEndDateTime());
            sendNotificationModel.setCampaignName("activationscheme");
            sendNotificationModel.setUrl("https://smartdukaan.com/pages/home/notifications");
            sendNotificationModel.setExpiresat(LocalDateTime.now().plusDays(1));
            sendNotificationModel.setMessage(message);
            sendNotificationModel.setTitle(String.format(titleTemplate, schemeString, FormattingUtils.formatDecimal(scheme.getAmount()), org.apache.commons.lang3.StringUtils.abbreviate(String.join(", ", itemDescriptions), 25)));
            sendNotificationModel.setType("url");
            sendNotificationModel.setMessageType(MessageType.scheme);
            List<SchemeRegion> schemeRegionList = schemeRegionRepository.selectAllBySchemeIds(Arrays.asList(scheme.getId()));
            sendNotificationModel.setRegionIds(schemeRegionList.stream().map(x->x.getRegionId()).collect(Collectors.toList()));
            notificationService.sendNotification(sendNotificationModel);
        }
    }

    @Autowired
    LoanRepository loanRepository;

    @Override
    public void expireSchemeById(int schemeId, LocalDateTime expiryTime) throws ProfitMandiBusinessException {
        Scheme scheme = schemeRepository.selectById(schemeId);
        if (scheme == null || scheme.getActiveTimestamp() == null) {
            throw new ProfitMandiBusinessException(ProfitMandiConstants.ACTIVE_TIMESTAMP, scheme.getActiveTimestamp(),
                    "SCHM_1007");
        }
        if (scheme.getExpireTimestamp() != null) {
            throw new ProfitMandiBusinessException(ProfitMandiConstants.EXPIRE_TIMESTAMP, scheme.getExpireTimestamp(),
                    "SCHM_1008");
        }
        scheme.setExpireTimestamp(LocalDateTime.now());
        if (expiryTime.isAfter(scheme.getEndDateTime())) {
            throw new ProfitMandiBusinessException(ProfitMandiConstants.EXPIRE_TIMESTAMP, scheme.getExpireTimestamp(),
                    "End Date cant be extended during expiry");
        }
        scheme.setEndDateTime(expiryTime);
        schemeRepository.persist(scheme);
    }

    private Map<Integer, Scheme> toSchemeIdSchemeMap(List<Scheme> schemes) {
        Map<Integer, Scheme> schemeIdSchemeMap = new HashMap<>();
        for (Scheme scheme : schemes) {
            schemeIdSchemeMap.put(scheme.getId(), scheme);
        }
        return schemeIdSchemeMap;
    }

    private Map<Integer, Set<Scheme>> toCatalogIdSchemesMap(List<SchemeItem> schemeItems, List<Scheme> schemes) {
        Map<Integer, Scheme> schemesMap = schemes.stream().collect(Collectors.toMap(x -> x.getId(), x -> x));
        Map<Integer, Set<Scheme>> catalogSchemesMap = new HashMap<>();
        for (SchemeItem schemeItem : schemeItems) {
            if (!catalogSchemesMap.containsKey(schemeItem.getCatalogId())) {
                catalogSchemesMap.put(schemeItem.getCatalogId(), new HashSet<>());
            }
            Set<Scheme> schemesSet = catalogSchemesMap.get(schemeItem.getCatalogId());
            schemesSet.add(schemesMap.get(schemeItem.getSchemeId()));
        }
        return catalogSchemesMap;
    }

    private Map<InventoryItem, Set<Scheme>> toInventoryItemSchemesMap(List<Scheme> schemes,
                                                                      List<InventoryItem> inventoryItems) throws ProfitMandiBusinessException {
        Set<Integer> schemeIds = schemes.stream().map(x -> x.getId()).collect(Collectors.toSet());
        Set<Integer> itemIds = inventoryItems.stream().map(x -> x.getItemId()).collect(Collectors.toSet());
        Set<Integer> catalogIds = itemRepository.selectByIds(itemIds).stream().map(x -> x.getCatalogItemId()).collect(Collectors.toSet());
        List<SchemeItem> schemeItems = schemeItemRepository.selectBySchemeIdsAndCatalogIds(schemeIds, catalogIds);

        Map<Integer, Set<Scheme>> catalogIdSchemesMap = this.toCatalogIdSchemesMap(schemeItems, schemes);
        Map<InventoryItem, Set<Scheme>> inventoryItemSchemesMap = new HashMap<>();
        for (InventoryItem inventoryItem : inventoryItems) {
            LOGGER.info("inventoryItem {}", inventoryItem);
            LOGGER.info("inventoryItem.getItem() {}", inventoryItem.getItem());
            LOGGER.info("catalogIdSchemesMap {}", catalogIdSchemesMap);
            if (catalogIdSchemesMap.containsKey(inventoryItem.getItem().getCatalogItemId())) {
                inventoryItemSchemesMap.put(inventoryItem, catalogIdSchemesMap.get(inventoryItem.getItem().getCatalogItemId()));
            }
        }
        return inventoryItemSchemesMap;
    }

    @Autowired
    OfferTargetSlabRepository offerTargetSlabRepository;

    private Scheme toScheme(int creatorId, CreateSchemeRequest createSchemeRequest) {
        Scheme scheme = new Scheme();
        scheme.setName(createSchemeRequest.getName());
        scheme.setDescription(createSchemeRequest.getDescription());
        scheme.setType(createSchemeRequest.getType());
        scheme.setAmountType(createSchemeRequest.getAmountType());
        scheme.setAmount(createSchemeRequest.getAmount());
        scheme.setPartnerType(createSchemeRequest.getPartnerType());
        scheme.setStartDateTime(createSchemeRequest.getStartDate());
        scheme.setEndDateTime(createSchemeRequest.getEndDate());
        scheme.setCreatedBy(creatorId);
        scheme.setCashback(createSchemeRequest.isCashback());
        return scheme;
    }

    @Autowired
    OfferRepository offerRepository;
    @Autowired
    OfferPayoutRepository offerPayoutRepository;
    @Autowired
    AgeingService ageingService;

    @Override
    public void sendCombinedSchemesNotification(List<Scheme> schemes) throws ProfitMandiBusinessException {
        if (schemes == null || schemes.isEmpty()) return;


        List<Scheme> filteredSchemes = schemes.stream()
                .filter(s -> NOFITY_SCHEME_TYPES.contains(s.getType())).collect(Collectors.toList());

        Map<Integer, Scheme> schemesMap = filteredSchemes.stream().collect(Collectors.toMap(x -> x.getId(), x -> x));

        Map<Integer, List<Integer>> schemeIdRegionIdsMap = schemeRegionRepository.selectAllBySchemeIds(new ArrayList<>(schemesMap.keySet()))
                .stream().collect(Collectors.groupingBy(SchemeRegion::getSchemeId,
                        Collectors.collectingAndThen(Collectors.mapping(SchemeRegion::getRegionId, Collectors.toList()),
                                list -> {
                                    list.sort(Integer::compareTo); // sort ascending
                                    return list;
                                })));

        // Batch fetch all schemeItems for all filtered schemes at once
        Map<Integer, List<SchemeItem>> schemeItemsBySchemeId = schemeItemRepository.selectBySchemeIds(schemesMap.keySet())
                .stream().collect(Collectors.groupingBy(SchemeItem::getSchemeId));

        // Collect all catalogIds for batch fetching items
        Set<Integer> allCatalogIdsForFetch = schemeItemsBySchemeId.values().stream()
                .flatMap(List::stream)
                .map(SchemeItem::getCatalogId)
                .collect(Collectors.toSet());

        // Batch fetch all items by catalogIds once
        Map<Integer, List<Item>> itemsByCatalogId = allCatalogIdsForFetch.isEmpty()
                ? Collections.emptyMap()
                : itemRepository.selectAllByCatalogIds(allCatalogIdsForFetch).stream()
                        .collect(Collectors.groupingBy(Item::getCatalogItemId));

        Map<List<Integer>, Map<String, List<Scheme>>> groupedByRegionDate = filteredSchemes.stream()
                .collect(Collectors.groupingBy(s -> schemeIdRegionIdsMap.get(s.getId()), Collectors.groupingBy(s ->
                        String.valueOf(s.getStartDateTime()).substring(0, 10) + " - " +
                                String.valueOf(s.getEndDateTime()).substring(0, 10)
                )));

        for (Map.Entry<List<Integer>, Map<String, List<Scheme>>> regionIdsDateKeyset : groupedByRegionDate.entrySet()) {
            SendNotificationModel sendNotificationModel = new SendNotificationModel();
            StringBuilder messageBuilder = new StringBuilder();
            Set<Integer> allCatalogIds = new HashSet<>();
            Set<String> allBrands = new HashSet<>();
            String schemeTypeLabel = null;
            List<Integer> regionIds = regionIdsDateKeyset.getKey();
            for (Map.Entry<String, List<Scheme>> entry : regionIdsDateKeyset.getValue().entrySet()) {
                String dateRange = entry.getKey();
                List<Scheme> groupedSchemes = entry.getValue();


                messageBuilder.append(String.format("Duration: %s\n", dateRange));

                for (Scheme scheme : groupedSchemes) {
                    if (scheme.getType().equals(SchemeType.SPECIAL_SUPPORT)) {
                        schemeTypeLabel = "Special Support";
                    } else if (scheme.getType().equals(SchemeType.SELLOUT)) {
                        schemeTypeLabel = "Sellout Scheme";
                    } else if (scheme.getType().equals(SchemeType.SELLIN)) {
                        schemeTypeLabel = "Sellin Scheme";
                    }

                    // Use pre-fetched schemeItems instead of N+1 query
                    List<SchemeItem> schemeItems = schemeItemsBySchemeId.getOrDefault(scheme.getId(), Collections.emptyList());

                    Set<Integer> catalogIds = schemeItems.stream()
                            .map(SchemeItem::getCatalogId)
                            .collect(Collectors.toSet());
                    allCatalogIds.addAll(catalogIds);

                    // Use pre-fetched items instead of duplicate DB calls
                    List<Item> catalogItems = catalogIds.stream()
                            .flatMap(cid -> itemsByCatalogId.getOrDefault(cid, Collections.emptyList()).stream())
                            .collect(Collectors.toList());

                    List<String> itemBrand = catalogItems.stream()
                            .map(Item::getBrand)
                            .filter(Objects::nonNull)
                            .distinct()
                            .collect(Collectors.toList());
                    allBrands.addAll(itemBrand);

                    List<String> itemDescriptions = catalogItems.stream()
                            .filter(Utils.distinctByKey(Item::getCatalogItemId))
                            .map(Item::getItemDescriptionNoColor)
                            .collect(Collectors.toList());

                    messageBuilder.append(String.format("%s of Rs.%s on Models: %s\n",
                            schemeTypeLabel,
                            FormattingUtils.formatDecimal(scheme.getAmount()),
                            String.join(", ", itemDescriptions)));
                }
                messageBuilder.append("\n");
            }

            if (allBrands.isEmpty() || schemeTypeLabel == null) return;

            String title = String.format("%s for %s",
                    schemeTypeLabel,
                    String.join(", ", allBrands));

            sendNotificationModel.setCampaignName("activationscheme");
            sendNotificationModel.setUrl("https://store.smartdukaan.com/pages/home/notifications");
            sendNotificationModel.setExpiresat(LocalDateTime.now().plusDays(1));
            sendNotificationModel.setTitle(title);
            sendNotificationModel.setMessage(messageBuilder.toString().trim());
            sendNotificationModel.setType("url");
            sendNotificationModel.setMessageType(MessageType.scheme);
            sendNotificationModel.setRegionIds(regionIds);
            System.out.println(sendNotificationModel);
            notificationService.sendNotification(sendNotificationModel);
        }

    }

    //CDs would be rejected based on cdFreeDays
    public void processSchemeIn(int purchaseId, int retailerId) throws ProfitMandiBusinessException {
        LOGGER.info("Processing scheme in  for purchaseId - {}", purchaseId);
        Purchase purchase = purchaseRepository.selectByIdAndFofoId(purchaseId, retailerId);
        String purchaseInvoice = purchase.getPurchaseReference();

        Loan loan = loanRepository.selectLoanByInvoice(purchaseInvoice);

        PartnerType partnerType = partnerTypeChangeService.getTypeOnMonth(retailerId,
                YearMonth.from(purchase.getCreateTimestamp()));
        List<InventoryItem> inventoryItems = inventoryItemRepository.selectByPurchaseId(purchaseId);
        //Remove imeis from blocked imeis list
        List<String> blockedImeis = this.getBlockedImeis();
        inventoryItems = inventoryItems.stream().filter(inventoryItem -> !blockedImeis.contains(inventoryItem.getSerialNumber())).collect(Collectors.toList());
        if (inventoryItems.size() == 0) return;

        Set<Integer> itemIds = inventoryItems.stream().map(x -> x.getItemId()).collect(Collectors.toSet());
        Map<Integer, Item> itemsMap = itemRepository.selectByIds(itemIds).stream().collect(Collectors.toMap(x -> x.getId(), x -> x));
        inventoryItems.stream().forEach(x -> x.setItem(itemsMap.get(x.getItemId())));


        LocalDateTime billingDate = purchaseService.getBillingDateOfPurchase(purchaseId);
        Set<Integer> itemIdsSet = tagListingRepository.selectByItemIdsAndTagIds(itemIds, tagIds).stream()
                .filter(x -> x.getEolDate() == null || x.getEolDate().isAfter(billingDate)).map(x -> x.getItemId())
                .collect(Collectors.toSet());
        // Only consider inventory items that were not returned and not eol
        inventoryItems = inventoryItems.stream().filter(x -> itemIdsSet.contains(x.getItemId()))
                .filter(x -> !x.getLastScanType().equals(ScanType.PURCHASE_RET_BAD))
                .filter(x -> !x.getLastScanType().equals(ScanType.PURCHASE_RET)).collect(Collectors.toList());

        if (inventoryItems.size() == 0) return;

        Map<Integer, List<InventoryItem>> catalogInventoryItemMap = inventoryItems.stream().collect(Collectors.groupingBy(x -> x.getItem().getCatalogItemId()));
        Map<CatalogSummaryModel, List<SchemeSummaryModel>> catalogSchemeSummaryMap = tagListingRepository.getModelSchemesByCatalogIdsAndType(retailerId,
                partnerType, new ArrayList<>(catalogInventoryItemMap.keySet()), billingDate);


        int itemsCount = 0;
        float totalCashback = 0;
        //LOGGER.info("catalogSchemeSummaryMap - {}", catalogSchemeSummaryMap);
        for (Map.Entry<CatalogSummaryModel, List<SchemeSummaryModel>> catalogSummaryModelListEntry : catalogSchemeSummaryMap.entrySet()) {
            CatalogSummaryModel catalogSummaryModel = catalogSummaryModelListEntry.getKey();
            List<SchemeSummaryModel> schemeSummaryModels = catalogSummaryModelListEntry.getValue().stream().filter(Objects::nonNull).collect(Collectors.toList());
            schemeSummaryModels.stream().filter(x -> x != null && x.getSchemeType().getTransactionType().equals(StockTransactionType.IN)).forEach(x -> x.setProcess(true));
            if (schemeSummaryModels.stream().filter(x -> x.isProcess()).count() == 0) continue;
            List<InventoryItem> modelInventoryItems = catalogInventoryItemMap.get(catalogSummaryModel.getCatalogId());
            for (InventoryItem inventoryItem : modelInventoryItems) {
                float inventoryItemCashback = this.createSchemeInOut(schemeSummaryModels, inventoryItem);
                if (loan != null && loan.getCdFreeDays() > 0) {
                    List<SchemeType> rejectedCDTypes = HALF_DAYS_CD_REJECT_SCHEME_TYPES;
                    if (loan.getCdFreeDays() == ProfitMandiConstants.LOAN_FULL_CREDIT_DAYS
                            || loan.getCdFreeDays() == ProfitMandiConstants.PREMIUM_LOAN_FULL_CREDIT_DAYS) {
                        rejectedCDTypes = FULL_DAYS_CD_SCHEME_TYPES;
                    }
                    for (SchemeType rejectedCDType : rejectedCDTypes) {
                        // Fixed: single DB call instead of duplicate calls
                        List<SchemeInOut> cdSios = schemeInOutRepository.selectAllByType(rejectedCDType, inventoryItem.getId());
                        SchemeInOut sio = cdSios.isEmpty() ? null : cdSios.get(0);
                        if (sio == null) continue;
                        sio.setStatus(SchemePayoutStatus.REJECTED);
                        //Noone should change this
                        sio.setStatusDescription("Rejected due to free days availed");
                        sio.setRolledBackTimestamp(LocalDateTime.now());
                        inventoryItemCashback -= sio.getAmount();
                    }

                }
                if (inventoryItemCashback > 0.01f) {
                    itemsCount++;
                    totalCashback += inventoryItemCashback;
                }
            }
        }


        LOGGER.info("Items count for purchase id {} is {}", purchaseId, itemsCount);
        if (itemsCount > 0) {
            walletService.addAmountToWallet(
                    retailerId, purchaseId, WalletReferenceType.SCHEME_IN, "Added for SCHEME IN against invoice "
                            + purchase.getPurchaseReference() + " (total " + itemsCount + " pcs)",
                    totalCashback, purchase.getCreateTimestamp());
            LOGGER.info("Added Rs.{} for SCHEME IN against invoice {} total pcs({}) {}", totalCashback,
                    purchase.getPurchaseReference(), itemsCount);
        }

    }


    // Calculate payout amount for an inventory item based on scheme type.
    // For PERCENTAGE schemes: uses price-drop-adjusted DP (purchase price minus any price drop).
    //   GRN (IN): dpForCalc = purchasePrice - priceDropAmount
    //   Sale (OUT): dpForCalc = min(purchasePrice - priceDropAmount, currentSellingPrice)
    //   On price increase, priceDropAmount stays 0 so payout is on original purchase price.
    // For FIXED schemes (IN only): returns the fixed scheme amount directly.
    private float getAmount(InventoryItem inventoryItem, Scheme scheme) throws ProfitMandiBusinessException {
        if (this.getBlockedImeis().contains(inventoryItem.getSerialNumber())) {
            return 0;
        }
        float amount = 0;
        float dpForCalc = 0;

        if (scheme.getAmountType().equals(AmountType.PERCENTAGE)) {
            if (scheme.getType().getTransactionType().equals(StockTransactionType.IN)) {
                dpForCalc = inventoryItem.getUnitPrice() - inventoryItem.getPriceDropAmount();
            } else {
                try {
                    dpForCalc = Math.min(inventoryItem.getUnitPrice() - inventoryItem.getPriceDropAmount(),
                            tagListingRepository.selectByItemId(inventoryItem.getItemId()).getSellingPrice());
                } catch (Exception e) {
                    LOGGER.info("Could not find tag Listing entry for itemId {}", inventoryItem.getItemId());
                }
            }
            amount = dpForCalc * scheme.getAmount() / 100;
            LOGGER.debug("Scheme payout: invId={}, imei={}, schemeId={}, dpForCalc={}, schemeAmt={}%, payout={}",
                    inventoryItem.getId(), inventoryItem.getSerialNumber(), scheme.getId(),
                    dpForCalc, scheme.getAmount(), amount);
        } else if (scheme.getType().getTransactionType().equals(StockTransactionType.IN)) {
            amount = scheme.getAmount();
        }
        return amount;
    }

    //Only in and activation margins are allowed to be rolled out more than twice
    private float createSchemeInOut(List<SchemeSummaryModel> schemeSummaryModels, InventoryItem inventoryItem) throws ProfitMandiBusinessException {
        LOGGER.info("schemeSummaryModels - {}", schemeSummaryModels);
        InventoryPayoutModel inventoryPayoutModel = priceCircularService.getPayouts(inventoryItem);
        //Get all schemes
        List<SchemeSummaryModel> inventoryPayoutModelToProcess = schemeSummaryModels.stream().filter(x -> x.isProcess()).collect(Collectors.toList());
        LOGGER.info("inventoryPayoutModel - {}", inventoryPayoutModel);
        List<SchemeInOut> paidSios = inventoryPayoutModel.getPaidSios();
        List<SchemeInOut> pendingSios = inventoryPayoutModel.getPendingSios();
        Map<SchemeType, List<SchemeInOut>> paidSchemeTypesMap = inventoryPayoutModel.getPaidSios().stream().collect(Collectors.groupingBy(x -> x.getScheme().getType()));
        Map<Integer, SchemeInOut> paidSchemesMap = paidSios.stream().collect(Collectors.toMap(x -> x.getSchemeId(), x -> x));
        Map<Integer, SchemeInOut> pendingSchemesMap = pendingSios.stream().collect(Collectors.toMap(x -> x.getSchemeId(), x -> x));
        Map<SchemeType, Float> schemeTypeCancelledAmountMap = new HashMap<>();

        double percentageToPay = 0d;
        double fixedToPay = 0d;
        Map<SchemeSummaryModel, AmountModel> payoutSchemeSummaryModelMap = new HashMap<>();
        for (SchemeSummaryModel schemeSummaryModelToProcess : inventoryPayoutModelToProcess) {
            if (paidSchemesMap.containsKey(schemeSummaryModelToProcess.getSchemeId()) || pendingSchemesMap.containsKey(schemeSummaryModelToProcess.getSchemeId()))
                continue;
            //All Valid margins are supposed to be credited invalid margins should be rejected
            if (SchemeType.SPECIAL_SUPPORT.equals(schemeSummaryModelToProcess.getSchemeType())) {
                //Create only if the activation date is not known
                ActivatedImei activatedImei = activatedImeiRepository.selectBySerialNumber(inventoryItem.getSerialNumber());
                if (activatedImei == null || activatedImei.getActivationTimestamp() == null) {
                    SchemeInOut sio = new SchemeInOut();
                    sio.setAmount(0);
                    sio.setInventoryItemId(inventoryItem.getId());
                    sio.setSchemeId(schemeSummaryModelToProcess.getSchemeId());
                    sio.setStatusDescription("Activation pending for IMEI#" + inventoryItem.getSerialNumber());
                    sio.setStatus(SchemePayoutStatus.PENDING);
                    schemeInOutRepository.persist(sio);
                }
            } else if (!StockTransactionType.IN.equals(schemeSummaryModelToProcess.getSchemeType().getTransactionType())) {
                //We have got non repeating scheme type

                if (schemeSummaryModelToProcess.getAmountType().equals(AmountType.PERCENTAGE)) {
                    percentageToPay += schemeSummaryModelToProcess.getAmount();
                } else {
                    fixedToPay += schemeSummaryModelToProcess.getAmount();
                }
                payoutSchemeSummaryModelMap.put(schemeSummaryModelToProcess, new AmountModel(schemeSummaryModelToProcess.getAmount(), schemeSummaryModelToProcess.getAmountType()));

            } else {
                if (schemeSummaryModelToProcess.getAmountType().equals(AmountType.PERCENTAGE)) {
                    //Check for rejected CashDiscounts dont continue if its ever rejected once
                    if (SchemeType.CDS.contains(schemeSummaryModelToProcess.getSchemeType())) {
                        List<SchemeInOut> cdSchemeInOuts = schemeInOutRepository.selectAllByType(schemeSummaryModelToProcess.getSchemeType(), inventoryItem.getId());
                        LOGGER.info(cdSchemeInOuts);
                        if (!cdSchemeInOuts.isEmpty() && cdSchemeInOuts.get(0).getStatusDescription().equals("Rejected due to free days availed"))
                            continue;
                    }
                    percentageToPay += schemeSummaryModelToProcess.getAmount();
                } else {
                    fixedToPay += schemeSummaryModelToProcess.getAmount();
                }
                payoutSchemeSummaryModelMap.put(schemeSummaryModelToProcess, new AmountModel(schemeSummaryModelToProcess.getAmount(), schemeSummaryModelToProcess.getAmountType()));
            }
        }
        double walletCredit = 0d;
        if (fixedToPay > 0) {
            double fixedRollout = fixedToPay * (100 / (100 + inventoryPayoutModel.getPercentageAmount()));
            for (Map.Entry<SchemeSummaryModel, AmountModel> schemeSummaryModelAmountModelEntry : payoutSchemeSummaryModelMap.entrySet()) {
                SchemeSummaryModel schemeSummaryModel = schemeSummaryModelAmountModelEntry.getKey();
                AmountModel amountModel = schemeSummaryModelAmountModelEntry.getValue();
                if (amountModel.getAmountType().equals(AmountType.FIXED)) {
                    SchemeInOut sio = new SchemeInOut();
                    sio.setSchemeId(schemeSummaryModel.getSchemeId());
                    sio.setInventoryItemId(inventoryItem.getId());
                    sio.setStatus(SchemePayoutStatus.CREDITED);
                    sio.setCreditTimestamp(LocalDateTime.now());
                    sio.setAmount((float) (fixedRollout * amountModel.getAmount() / fixedToPay) + schemeTypeCancelledAmountMap.getOrDefault(schemeSummaryModel.getSchemeType(), 0f));
                    if (schemeSummaryModel.getSchemeType().getTransactionType().equals(StockTransactionType.IN))
                        sio.setStatusDescription("Credited for GRN of IMEI-" + inventoryItem.getSerialNumber());
                    else
                        sio.setStatusDescription("Credited for Sale of IMEI-" + inventoryItem.getSerialNumber());
                    schemeInOutRepository.persist(sio);
                }
            }
            walletCredit += fixedRollout;
        }

        if (percentageToPay > 0) {
            LOGGER.info("inventoryPayoutModel.getFixedAmount() ----> {}", inventoryPayoutModel.getFixedAmount());
            double effectiveDP = inventoryPayoutModel.getDp() - (inventoryPayoutModel.getFixedAmount() + fixedToPay);
            double totalPercentage = inventoryPayoutModel.getPercentageAmount() + percentageToPay;
            double percentageRollout = effectiveDP * (totalPercentage / (100 + totalPercentage) - (inventoryPayoutModel.getPercentageAmount() / (100 + inventoryPayoutModel.getPercentageAmount())));
            for (Map.Entry<SchemeSummaryModel, AmountModel> schemeSummaryModelAmountModelEntry : payoutSchemeSummaryModelMap.entrySet()) {
                SchemeSummaryModel schemeSummaryModel = schemeSummaryModelAmountModelEntry.getKey();
                AmountModel amountModel = schemeSummaryModelAmountModelEntry.getValue();
                if (amountModel.getAmountType().equals(AmountType.PERCENTAGE)) {
                    SchemeInOut sio = new SchemeInOut();
                    sio.setInventoryItemId(inventoryItem.getId());
                    sio.setSchemeId(schemeSummaryModel.getSchemeId());
                    sio.setStatus(SchemePayoutStatus.CREDITED);
                    sio.setCreditTimestamp(LocalDateTime.now());
                    sio.setAmount((float) (percentageRollout * amountModel.getAmount() / percentageToPay) +
                            schemeTypeCancelledAmountMap.getOrDefault(schemeSummaryModel.getSchemeType(), 0f));
                    if (schemeSummaryModel.getSchemeType().getTransactionType().equals(StockTransactionType.IN))
                        sio.setStatusDescription("Credited for GRN of IMEI-" + inventoryItem.getSerialNumber());
                    else
                        sio.setStatusDescription("Credited for Sale of IMEI-" + inventoryItem.getSerialNumber());
                    schemeInOutRepository.persist(sio);
                }
            }
            walletCredit += percentageRollout;
        }

        return (float) walletCredit;
    }

    private Set<Integer> filterImeisByAgeing(Set<Integer> inventoryItemIds, FofoOrder fofoOrder) {
        Set<Integer> filteredInventoryIds = new HashSet<>();
        List<PartnerAgeingModel> partnerAgeingModels = ageingService.filterAgedInventory(inventoryItemIds);

        // Batch fetch all Samsung activated imeis at once to avoid N+1 queries
        List<String> samsungSerialNumbers = partnerAgeingModels.stream()
                .filter(p -> "Samsung".equalsIgnoreCase(p.getBrand()))
                .map(PartnerAgeingModel::getSerialNumber)
                .collect(Collectors.toList());
        Map<String, ActivatedImei> activatedImeiMap = samsungSerialNumbers.isEmpty()
                ? Collections.emptyMap()
                : activatedImeiRepository.selectBySerialNumbers(samsungSerialNumbers).stream()
                        .collect(Collectors.toMap(ActivatedImei::getSerialNumber, x -> x, (a, b) -> a));

        for (PartnerAgeingModel partnerAgeingModel : partnerAgeingModels) {
            LOGGER.info("Serial Number - {}", partnerAgeingModel.getSerialNumber());
            if (partnerAgeingModel.getBrand().equalsIgnoreCase("Samsung")) {
                ActivatedImei activatedImei = activatedImeiMap.get(partnerAgeingModel.getSerialNumber());
                if (activatedImei != null && activatedImei.getActivationTimestamp().toLocalDate()
                        .isBefore(partnerAgeingModel.getBillingDate().plusDays(partnerAgeingModel.getMaxAgeingDays()))) {
                    //Lets give money if activation is there and is before the ageing limit of that brand.
                    filteredInventoryIds.add(partnerAgeingModel.getInventoryItemId());
                } else {
                    //If billing happens before ageing expiry
                    if (fofoOrder.getCreateTimestamp().toLocalDate().isBefore(partnerAgeingModel.getBillingDate().plusDays(partnerAgeingModel.getMaxAgeingDays()))) {
                        filteredInventoryIds.add(partnerAgeingModel.getInventoryItemId());
                    }
                }
            } else {
                filteredInventoryIds.add(partnerAgeingModel.getInventoryItemId());
            }
        }
        return filteredInventoryIds;
    }

    @Autowired
    WarehouseInventoryItemRepository warehouseInventoryItemRepository;

    @Override
    public float processSchemeOut(int fofoOrderId, int retailerId) throws ProfitMandiBusinessException {

        float totalCashback = 0;
        FofoOrder fofoOrder = fofoOrderRepository.selectByFofoIdAndOrderId(retailerId, fofoOrderId);
        // Process only if order is not cancelled
        if (fofoOrder.getCancelledTimestamp() == null) {
            // PartnerType partnerType = partnerTypeChangeService.getTypeOnDate(retailerId,
            // fofoOrder.getCreateTimestamp().toLocalDate());
            // TODO - SCHEME
            PartnerType partnerType = partnerTypeChangeService.getTypeOnMonth(retailerId,
                    YearMonth.from(fofoOrder.getCreateTimestamp()));
            LOGGER.info("OrderDate - {}, Partner Type - {}", fofoOrder.getCreateTimestamp(), partnerType);

            List<ScanRecord> scanRecords = scanRecordRepository.selectAllByOrderId(fofoOrderId);
            if (scanRecords.size() == 0) return 0;
            Set<Integer> inventoryItemIds = scanRecords.stream().map(x -> x.getInventoryItemId())
                    .collect(Collectors.toSet());
            //Check for ageing
            //inventoryItemIds = this.filterImeisByAgeing(inventoryItemIds, fofoOrder);
            //ageingService.filterAgedInventory(inventoryItemIds);
            LOGGER.info("fofoOrderId --- {}", fofoOrderId);
            LOGGER.info("scanRecords --- {}", scanRecords);
            LOGGER.info("inventoryItemIds --- {}", inventoryItemIds);

            if (inventoryItemIds.size() == 0) return 0;

            // Cache blocked imeis to avoid repeated DB calls inside stream
            Set<String> blockedImeisSet = new HashSet<>(this.getBlockedImeis());

            Set<InventoryItem> inventoryItems = inventoryItemRepository.selectByIds(inventoryItemIds).stream()
                    .filter(x -> x.getSerialNumber() != null && !x.getSerialNumber().equals(""))
                    .collect(Collectors.toSet());
            inventoryItems = inventoryItems.stream().filter(inventoryItem -> !blockedImeisSet.contains(inventoryItem.getSerialNumber())).collect(Collectors.toSet());

            //Do not consider imei above 90 days for samsung
            List<String> samsungSerialNumbers = inventoryItems.stream().filter(x -> x.getItem().getBrand().equalsIgnoreCase("samsung")).map(x -> x.getSerialNumber()).collect(Collectors.toList());

            if (samsungSerialNumbers.size() > 0) {
                List<AgeingSummaryModel> billedImeiModels = warehouseInventoryItemRepository.findStockAgeingByFofoIdSerialNumbers(retailerId, samsungSerialNumbers);
                List<String> agedSerialNumbers = billedImeiModels.stream().filter(x -> x.isAgedAbove(365)).map(x -> x.getSerialNumber()).collect(Collectors.toList());
                if (agedSerialNumbers.size() > 0) {
                    List<String> samsungExceptionsSerialNumbers = samsungExceptionRepository.selectAllBySerialNumber(agedSerialNumbers).stream().map(x -> x.getSerialNumber()).collect(Collectors.toList());
                    agedSerialNumbers.removeAll(samsungExceptionsSerialNumbers);
                }
                inventoryItems = inventoryItems.stream().filter(x -> !agedSerialNumbers.contains(x.getSerialNumber())).collect(Collectors.toSet());
            }

            if (inventoryItems.size() == 0) return 0;

            Map<Integer, List<InventoryItem>> catalogInventoryItemMap = inventoryItems.stream().collect(Collectors.groupingBy(x -> x.getItem().getCatalogItemId()));
            Map<CatalogSummaryModel, List<SchemeSummaryModel>> catalogSchemeSummaryMap = tagListingRepository.getModelSchemesByCatalogIdsAndType(retailerId,
                    partnerType, new ArrayList<>(catalogInventoryItemMap.keySet()), fofoOrder.getCreateTimestamp());

            LOGGER.info("catalogSchemeSummaryMap - {}", catalogSchemeSummaryMap);

            // N+1 fix: Batch fetch all SchemeInOut records for all inventoryItems at once
            Set<Integer> allInventoryItemIds = inventoryItems.stream().map(InventoryItem::getId).collect(Collectors.toSet());
            List<SchemeInOut> allSchemeInOuts = schemeInOutRepository.selectByInventoryItemIds(allInventoryItemIds);
            Map<Integer, List<SchemeInOut>> schemeInOutByInventoryItemId = allSchemeInOuts.stream()
                    .collect(Collectors.groupingBy(SchemeInOut::getInventoryItemId));

            int count = 0;
            for (Map.Entry<CatalogSummaryModel, List<SchemeSummaryModel>> catalogSummaryModelListEntry : catalogSchemeSummaryMap.entrySet()) {
                CatalogSummaryModel catalogSummaryModel = catalogSummaryModelListEntry.getKey();
                List<SchemeSummaryModel> schemeSummaryModels = catalogSummaryModelListEntry.getValue().stream().filter(x -> x != null).collect(Collectors.toList());

                schemeSummaryModels.stream().filter(x -> x.getSchemeType().getTransactionType().equals(StockTransactionType.OUT)).forEach(x -> x.setProcess(true));
                if (schemeSummaryModels.stream().filter(x -> x.isProcess()).count() == 0) continue;

                // Create map once per catalog instead of per inventoryItem
                Map<Integer, SchemeSummaryModel> schemeSummaryModelMap = schemeSummaryModels.stream().collect(Collectors.toMap(x -> x.getSchemeId(), x -> x));

                List<InventoryItem> modelInventoryItems = catalogInventoryItemMap.get(catalogSummaryModel.getCatalogId());
                for (InventoryItem inventoryItem : modelInventoryItems) {
                    // N+1 fix: Use pre-fetched schemeInOut map instead of querying per inventoryItem
                    List<SchemeInOut> sios = schemeInOutByInventoryItemId.getOrDefault(inventoryItem.getId(), Collections.emptyList());

                    List<Integer> creditedSchemeIds = sios.stream()
                            .filter(x -> x.getStatus().equals(SchemePayoutStatus.CREDITED))
                            .map(x -> x.getSchemeId()).collect(Collectors.toList());

                    double sioRejectedValue = 0;
                    if (creditedSchemeIds.size() > 0) {
                        //Also ignore special support
                        //This code block needs to be deleted
                        List<Integer> schemeIdsToReject = null;
                        if (Arrays.asList(1025588, 1025589).contains(inventoryItem.getItem().getCatalogItemId())) {
                            schemeIdsToReject = schemeRepository.selectBySchemeIds(creditedSchemeIds).stream()
                                    .filter(x -> !SchemeType.SPECIAL_SUPPORT.equals(x.getType()))
                                    .map(x -> x.getId()).collect(Collectors.toList());
                            //List<Integer> schemeIdsToReject = schemeRepository.selectBySchemeIds(creditedSchemeIds).stream()
                            //This code block needs to be deleted
                        } else {
                            schemeIdsToReject = schemeRepository.selectBySchemeIds(creditedSchemeIds).stream()
                                    .filter(x -> SchemeType.OUT_TYPES.contains(x.getType()) && !SchemeType.SPECIAL_SUPPORT.equals(x.getType()))
                                    .map(x -> x.getId()).collect(Collectors.toList());
                            //Reject invalid scheme payouts due to upgrade in Category or any change in schemes historically
                            //Lets not touchs
                        }
                        for (SchemeInOut sio : sios) {
                            if (schemeIdsToReject.contains(sio.getSchemeId()) && !schemeSummaryModelMap.containsKey(sio.getSchemeId())) {
                                // Removed unused schemeRepository.selectById call
                                sio.setStatusDescription("Rolledback due to Category upgrade/invalid scheme");
                                sio.setStatus(SchemePayoutStatus.REJECTED);
                                sio.setRolledBackTimestamp(LocalDateTime.now());
                                sioRejectedValue += sio.getAmount();
                            }
                        }
                    }
                    float inventoryItemCashback = this.createSchemeInOut(schemeSummaryModels, inventoryItem);
                    if (inventoryItemCashback > 0 || sioRejectedValue > 0) {
                        count++;
                        totalCashback += inventoryItemCashback - sioRejectedValue;
                    }
                }
            }

            if (count > 0) {
                walletService.addAmountToWallet(
                        retailerId, fofoOrderId, WalletReferenceType.SCHEME_OUT, "Sales margin for invoice number "
                                + fofoOrder.getInvoiceNumber() + ". Total " + count + " pc(s)",
                        totalCashback, fofoOrder.getCreateTimestamp());
                fofoOrder.setCashback(totalCashback + fofoOrder.getCashback());
            }
        }
        return totalCashback;
    }

    @Override
    //Tax rate has been passed to 0 to ensure no tax deduction
    public float getSpecialSupportAmount(float supportAmount, PartnerType partnerType, LocalDate onDate,
                                         int catalogId) throws ProfitMandiBusinessException {
        //int itemId = itemRepository.selectAllByCatalogItemId(catalogId).stream().findAny().get().getId();
        //float totalTaxRate = stateGstRateRepository.getTotalTaxRate(itemId);
        return this.getSpecialSupportAmount(supportAmount, partnerType, onDate, catalogId, 0);
    }

    @Override
    public float getSpecialSupportAmount(float supportAmount, PartnerType partnerType, LocalDate onDate,
                                         int catalogId, float taxRate) throws ProfitMandiBusinessException {
        float totalMargin = this.selectPercentageScheme(partnerType, onDate, catalogId, false, 0, 0).stream().collect(Collectors.summingDouble(x -> x.getAmount())).floatValue();
        float amountToCredit = supportAmount * (1 - (totalMargin / (100 + taxRate)));
        return amountToCredit;
    }

    @Override
    public void rollbackSchemes(List<Integer> inventoryItemIds, String rollbackReason)
            throws Exception {
        List<InventoryItem> inventoryItems = inventoryItemRepository.selectAllByIds(inventoryItemIds);
        Map<Integer, InventoryItem> inventoryItemMap = inventoryItems.stream().collect(Collectors.toMap(x -> x.getId(), x -> x));
        Map<Integer, Integer> purchasePartnerMap = inventoryItems.stream().collect(Collectors.toMap(x -> x.getPurchaseId(), x -> x.getFofoId(), (u, v) -> u));

        LOGGER.info("inventoryItemIds - {}", inventoryItemIds);
        List<SchemeInOut> schemeInOuts = schemeInOutRepository.selectByInventoryItemIds(new HashSet<>(inventoryItemIds));
        List<Integer> schemeIds = schemeInOuts.stream().map(x -> x.getSchemeId()).distinct().collect(Collectors.toList());
        if (schemeIds.size() == 0) return;
        List<Scheme> schemes = schemeRepository.selectBySchemeIds(schemeIds);
        Map<Integer, Float> inSchemesMap = new HashMap<>();
        Map<Integer, Float> outSchemesMap = new HashMap<>();
        Map<Integer, Scheme> schemesMap = schemes.stream().collect(Collectors.toMap(x -> x.getId(), x -> x));
        for (SchemeInOut schemeInOut : schemeInOuts) {
            Map<Integer, Float> schemePayoutMap;
            int inventoryItemId = schemeInOut.getInventoryItemId();
            if (schemeInOut.getRolledBackTimestamp() == null) {
                schemeInOut.setRolledBackTimestamp(LocalDateTime.now());
                if (schemeInOut.getStatus() == null || schemeInOut.getStatus().equals(SchemePayoutStatus.CREDITED)) {
                    Scheme scheme = schemesMap.get(schemeInOut.getSchemeId());
                    if (scheme.getType().getTransactionType().equals(StockTransactionType.IN)) {
                        schemePayoutMap = inSchemesMap;
                    } else {
                        schemePayoutMap = outSchemesMap;
                    }
                    if (!schemePayoutMap.containsKey(inventoryItemId)) {
                        schemePayoutMap.put(inventoryItemId, 0f);
                    }
                    schemePayoutMap.put(inventoryItemId, inSchemesMap.get(inventoryItemId) == null ? 0 : inSchemesMap.get(inventoryItemId) + schemeInOut.getAmount());
                }
                schemeInOut.setStatus(SchemePayoutStatus.REJECTED);
                schemeInOut.setStatusDescription(rollbackReason);
            }
        }
        Map<Integer, Double> purchaseRollbackAmountMap = inSchemesMap.entrySet().stream().collect(Collectors.groupingBy(x -> inventoryItemMap.get(x.getKey()).getPurchaseId(), Collectors.summingDouble(x -> x.getValue())));

        for (Map.Entry<Integer, Double> purchaseRollbackAmountEntry : purchaseRollbackAmountMap.entrySet()) {
            int purchaseId = purchaseRollbackAmountEntry.getKey();
            Double amountToRollback = purchaseRollbackAmountEntry.getValue();
            if (amountToRollback != null && amountToRollback > 0) {
                walletService.rollbackAmountFromWallet(purchasePartnerMap.get(purchaseId), amountToRollback.floatValue(), purchaseId,
                        WalletReferenceType.SCHEME_IN, rollbackReason, LocalDateTime.now());
            }
        }
    }

    @Override
    public void restoreSchemes(List<Integer> inventoryItemIds, String debitNoteNumber) throws Exception {
        String rollbackDescription = "Schemes rolled back for DebitNote #" + debitNoteNumber;

        List<InventoryItem> inventoryItems = inventoryItemRepository.selectAllByIds(inventoryItemIds);
        Map<Integer, InventoryItem> inventoryItemMap = inventoryItems.stream()
                .collect(Collectors.toMap(InventoryItem::getId, x -> x));
        Map<Integer, Integer> purchasePartnerMap = inventoryItems.stream()
                .collect(Collectors.toMap(InventoryItem::getPurchaseId, InventoryItem::getFofoId, (u, v) -> u));

        List<SchemeInOut> schemeInOuts = schemeInOutRepository.selectByInventoryItemIds(new HashSet<>(inventoryItemIds));
        List<Integer> schemeIds = schemeInOuts.stream().map(SchemeInOut::getSchemeId).distinct()
                .collect(Collectors.toList());
        if (schemeIds.isEmpty()) return;

        List<Scheme> schemes = schemeRepository.selectBySchemeIds(schemeIds);
        Map<Integer, Scheme> schemesMap = schemes.stream().collect(Collectors.toMap(Scheme::getId, x -> x));
        Map<Integer, Float> inSchemesRestoreMap = new HashMap<>();

        for (SchemeInOut schemeInOut : schemeInOuts) {
            if (schemeInOut.getStatus() != null
                    && schemeInOut.getStatus().equals(SchemePayoutStatus.REJECTED)
                    && rollbackDescription.equals(schemeInOut.getStatusDescription())) {

                Scheme scheme = schemesMap.get(schemeInOut.getSchemeId());
                if (scheme != null && scheme.getType().getTransactionType().equals(StockTransactionType.IN)) {
                    int inventoryItemId = schemeInOut.getInventoryItemId();
                    inSchemesRestoreMap.merge(inventoryItemId, schemeInOut.getAmount(), Float::sum);
                }

                schemeInOut.setStatus(SchemePayoutStatus.CREDITED);
                schemeInOut.setStatusDescription("Restored: DN rejection reversal #" + debitNoteNumber);
                schemeInOut.setRolledBackTimestamp(null);
            }
        }

        Map<Integer, Double> purchaseRestoreAmountMap = inSchemesRestoreMap.entrySet().stream()
                .collect(Collectors.groupingBy(
                        x -> inventoryItemMap.get(x.getKey()).getPurchaseId(),
                        Collectors.summingDouble(Map.Entry::getValue)));

        String restoreReason = "Scheme restored: DN rejection reversal #" + debitNoteNumber;
        for (Map.Entry<Integer, Double> entry : purchaseRestoreAmountMap.entrySet()) {
            int purchaseId = entry.getKey();
            Double amountToRestore = entry.getValue();
            if (amountToRestore != null && amountToRestore > 0) {
                walletService.addAmountToWallet(purchasePartnerMap.get(purchaseId), purchaseId,
                        WalletReferenceType.SCHEME_IN, restoreReason,
                        amountToRestore.floatValue(), LocalDateTime.now());
            }
        }
    }

    @Override
    public Map<String, Object> getSchemes(Set<Integer> roleIds, int offset, int limit)
            throws ProfitMandiBusinessException {
        Map<String, Object> map = new HashMap<>();
        List<Scheme> schemes = null;
        long size = 0;
        if (roleManager.isAdmin(roleIds)) {
            schemes = schemeRepository.selectAll(offset, limit);
            size = schemeRepository.selectAllCount();
        } else {
            schemes = schemeRepository.selectActiveAll(offset, limit);
            size = schemeRepository.selectAllActiveCount();
        }
        map.put("schemes", schemes);
        map.put("start", offset + 1);
        map.put("size", size);
        if (schemes.size() < limit) {
            map.put("end", offset + schemes.size());
        } else {
            map.put("end", offset + limit);
        }
        return map;
    }

    @Override
    public List<Scheme> getPaginatedSchemes(Set<Integer> roleIds, int offset, int limit)
            throws ProfitMandiBusinessException {
        LOGGER.info("requested offset=[{}], limit = [{}]", offset, limit);
        List<Scheme> schemes = null;
        if (roleManager.isAdmin(roleIds)) {
            schemes = schemeRepository.selectAll(offset, limit);
        } else {
            schemes = schemeRepository.selectActiveAll(offset, limit);
        }
        return schemes;
    }

    @Override
    // This is being called to reverse schemes while processing price Drop
    public void reverseSchemes(List<InventoryItem> inventoryItems, int priceDropId, String reversalReason)
            throws ProfitMandiBusinessException {
        PriceDrop priceDrop = priceDropRepository.selectById(priceDropId);
        Map<Integer, List<InventoryItem>> purchaseInventoryListMap = inventoryItems.stream()
                .collect(Collectors.groupingBy(InventoryItem::getPurchaseId, Collectors.toList()));

        for (Map.Entry<Integer, List<InventoryItem>> purchaseEntry : purchaseInventoryListMap.entrySet()) {
            float amountToCredit = 0;
            float amountToDebit = 0;
            int purchaseId = purchaseEntry.getKey();
            List<InventoryItem> purchaseInventoryItemList = purchaseEntry.getValue();

            Map<Integer, InventoryItem> inventoryItemsMap = purchaseInventoryItemList.stream()
                    .collect(Collectors.toMap(x -> x.getId(), x -> x));

            List<SchemeInOut> schemeInOuts = schemeInOutRepository.selectByInventoryItemIds(inventoryItemsMap.keySet());
            LOGGER.info("Scheme InOuts , {}", schemeInOuts);
            if (schemeInOuts.size() == 0) {
                continue;
            }
            List<Integer> schemeIds = schemeInOuts.stream().map(x -> x.getSchemeId()).collect(Collectors.toList());
            Map<Integer, Scheme> schemesMap = schemeRepository.selectBySchemeIds(schemeIds, 0, schemeIds.size())
                    .stream().collect(Collectors.toMap(x -> x.getId(), x -> x));
            for (SchemeInOut schemeInOut : schemeInOuts) {
                InventoryItem ii = inventoryItemsMap.get(schemeInOut.getInventoryItemId());
                Scheme scheme = schemesMap.get(schemeInOut.getSchemeId());
                if (scheme.getAmountType().equals(AmountType.FIXED)) {
                    continue;
                }
                if (scheme.getType().getTransactionType().equals(StockTransactionType.IN) && schemeInOut.getRolledBackTimestamp() == null) {
                    schemeInOut.setRolledBackTimestamp(LocalDateTime.now());
                    schemeInOut.setStatus(SchemePayoutStatus.REJECTED);
                    schemeInOut.setStatusDescription("Margin reversed due to price drop");
                    // IF not credited then dont consider any credit/debit for that sio entry
                    if (schemeInOut.getCreditTimestamp() != null) {
                        amountToDebit += schemeInOut.getAmount();
                    }
                }
            }
            int fofoId = inventoryItems.get(0).getFofoId();
            if (amountToDebit > 0) {
                walletService.addAmountToWallet(fofoId, purchaseId, WalletReferenceType.SCHEME_IN,
                        MessageFormat.format(reversalReason, purchaseInventoryItemList.size()), -amountToDebit,
                        priceDrop.getAffectedOn());
            }
        }
    }

    @Autowired
    UserWalletRepository userWalletRepository;
    @Autowired
    UserWalletHistoryRepository userWalletHistoryRepository;

    @Override
    // Always being called from cancel order/bad return means no SCHEME IN is considered
    public void reverseSchemes(List<InventoryItem> inventoryItems, int reversalReference, String reversalReason,
                               List<SchemeType> schemeTypes) throws ProfitMandiBusinessException {
        Map<Integer, InventoryItem> inventoryItemsMap = inventoryItems.stream()
                .collect(Collectors.toMap(x -> x.getId(), x -> x));
        LOGGER.info("inventoryItems" + inventoryItems);

        Map<SchemeType, SchemeInOut> schemeTypeMap = new HashMap<>();

        List<SchemeInOut> schemeInOuts = schemeInOutRepository.selectByInventoryItemIds(inventoryItemsMap.keySet());
        List<SchemeInOut> rolledBacks = schemeInOuts.stream().filter(x -> x.getStatusDescription().equals(reversalReason)).collect(Collectors.toList());
        float amountToRollback = 0;

        if (!schemeInOuts.isEmpty()) {
            List<Integer> schemeIds = schemeInOuts.stream().map(x -> x.getSchemeId()).collect(Collectors.toList());
            LOGGER.info("schemeIds" + schemeIds);

            Map<Integer, Scheme> schemesMap = schemeRepository.selectBySchemeIds(schemeIds, 0, schemeIds.size())
                    .stream().collect(Collectors.toMap(x -> x.getId(), x -> x));
            if (rolledBacks.size() > 0) {
                for (SchemeInOut schemeInOut : rolledBacks) {
                    Scheme scheme = schemesMap.get(schemeInOut.getSchemeId());
                    if (schemeTypes.contains(scheme.getType())) {
                        schemeTypeMap.put(scheme.getType(), schemeInOut);
                        if (schemeInOut.getCreditTimestamp() != null) {
                            amountToRollback += schemeInOut.getAmount();
                        }
                    }
                }
            } else {
                for (SchemeInOut schemeInOut : schemeInOuts) {
                    Scheme scheme = schemesMap.get(schemeInOut.getSchemeId());
                    if (schemeTypes.contains(scheme.getType())) {
                        if (schemeInOut.getRolledBackTimestamp() == null) {
                            schemeInOut.setRolledBackTimestamp(LocalDateTime.now());
                            if (schemeInOut.getStatus().equals(SchemePayoutStatus.CREDITED)) {
                                amountToRollback += schemeInOut.getAmount();
                            }
                            schemeInOut.setStatus(SchemePayoutStatus.REJECTED);
                            schemeInOut.setStatusDescription(reversalReason);
                        }
                    }
                }
            }
        }

        int fofoId = inventoryItems.get(0).getFofoId();
        WalletReferenceType walletReferenceType = schemeTypes.containsAll(SchemeType.OUT_SCHEME_TYPES) ? WalletReferenceType.SCHEME_OUT
                : schemeTypes.contains(SchemeType.SPECIAL_SUPPORT) ? WalletReferenceType.SPECIAL_SUPPORT
                : schemeTypes.contains(SchemeType.INVESTMENT) ? WalletReferenceType.SCHEME_OUT : null;
        List<UserWalletHistory> userWalletHistoryList = null;
        if (amountToRollback > 0 && walletReferenceType != null) {
            // Mark appropriate reference of rollback investment margin
            if (schemeTypes.contains(SchemeType.INVESTMENT) && schemeTypeMap.containsKey(SchemeType.INVESTMENT)) {
                LocalDateTime creditTime = schemeTypeMap.get(SchemeType.INVESTMENT).getCreditTimestamp();
                if (creditTime == null) return;
                int investmentReversalReference = Integer.parseInt(FormattingUtils.getYearMonth(creditTime.minusMonths(1)));
                UserWallet userWallet = userWalletRepository.selectByRetailerId(fofoId);
                userWalletHistoryList = userWalletHistoryRepository.selectAllByreferenceIdandreferenceType(userWallet.getUserId(), investmentReversalReference, WalletReferenceType.INVESTMENT_PAYOUT);
                if (userWalletHistoryList.size() > 0) {
                    walletReferenceType = WalletReferenceType.INVESTMENT_PAYOUT;
                } else {
                    userWalletHistoryList = null;
                }
            }
            if (userWalletHistoryList == null) {
                userWalletHistoryList = userWalletHistoryRepository.selectAllByreferenceIdandreferenceType(reversalReference, walletReferenceType);
            }
            if (userWalletHistoryList.size() > 0) {
                int maxDeductible = userWalletHistoryList.stream().collect(Collectors.summingInt(x -> x.getAmount()));
                if (maxDeductible > 0) {
                    LOGGER.info("----------> maxDeductible {}, amountToRollback {}, reversalReference {}, walletReferenceType {} ", maxDeductible, amountToRollback, reversalReference, walletReferenceType);
                    walletService.rollbackAmountFromWallet(fofoId, Math.min(maxDeductible, amountToRollback), reversalReference, walletReferenceType,
                            reversalReason, LocalDateTime.now());
                }

            }
        }
    }


    @Override
    public double getTotalMargin(int itemId, PartnerType partnerType, LocalDateTime dateTime) {
        Session session = sessionFactory.getCurrentSession();
        CriteriaBuilder cb = session.getCriteriaBuilder();
        CriteriaQuery<Double> criteriaQuery = cb.createQuery(Double.class);
        Root<SchemeItem> schemeItem = criteriaQuery.from(SchemeItem.class);
        Root<Scheme> scheme = criteriaQuery.from(Scheme.class);
        Predicate schemePredicate = cb.equal(scheme.get(ProfitMandiConstants.AMOUNT_TYPE), AmountType.PERCENTAGE);
        Predicate lessThanPredicate = cb.lessThanOrEqualTo(scheme.get(ProfitMandiConstants.END_DATE_TIME), dateTime);
        Predicate greaterThanPredicate = cb.greaterThanOrEqualTo(scheme.get(ProfitMandiConstants.START_DATE_TIME),
                dateTime);
        Predicate joinPredicate = cb.equal(scheme.get("id"), schemeItem.get("schemeId"));
        Predicate schemeItemPredicate = cb.equal(schemeItem.get(ProfitMandiConstants.ITEM_ID), itemId);
        criteriaQuery.select(cb.sum(scheme.get(ProfitMandiConstants.AMOUNT))).where(schemePredicate, lessThanPredicate,
                greaterThanPredicate, schemeItemPredicate, joinPredicate);

        Query<Double> query = session.createQuery(criteriaQuery);
        return query.getSingleResult() + ProfitMandiConstants.SCHEME_INVESTMENT_MARGIN;

    }


    @Override
    public Map<Integer, Float> getCatalogSchemeCashBack(int fofoId, List<Integer> catalogIds) throws
            ProfitMandiBusinessException {
        PartnerType partnerType = partnerTypeChangeService.getTypeOnDate(fofoId, LocalDate.now());
        Map<CatalogSummaryModel, List<SchemeSummaryModel>> catalogModelMap = tagListingRepository.getModelSchemesByCatalogIdsAndType(fofoId, partnerType, catalogIds, LocalDate.now().atStartOfDay());

        Map<Integer, Float> catalogCashbackMap = new HashMap<>();
        for (Map.Entry<CatalogSummaryModel, List<SchemeSummaryModel>> catalogSummaryModelListEntry : catalogModelMap.entrySet()) {
            int catalogItemId = catalogSummaryModelListEntry.getKey().getCatalogId();
            List<SchemeSummaryModel> schemeSummaryModels = catalogSummaryModelListEntry.getValue();

            float totalCashback = schemeSummaryModels.stream()
                    .filter(x -> Arrays.asList(SchemeType.SPECIAL_SUPPORT, SchemeType.SELLOUT).contains(x.getSchemeType())
                            && x.getAmountType().equals(AmountType.FIXED))
                    .collect(Collectors.summingDouble(x -> x.getAmount())).floatValue();
            catalogCashbackMap.put(catalogItemId, totalCashback);
        }
        return catalogCashbackMap;
    }

    @Override
    public List<Scheme> selectSchemeByPartnerTypeFofoId(PartnerType partnerType, LocalDate onDate, int catalogId,
                                                        int fofoId, int offset, int limit) throws ProfitMandiBusinessException {
        Session session = sessionFactory.getCurrentSession();
        final TypedQuery<Scheme> typedQuery = session.createNamedQuery(
                "Scheme.selectSchemeByModelsPartnerTypeFofoId", Scheme.class);
        typedQuery.setParameter("catalogIds", Arrays.asList(catalogId));
        typedQuery.setParameter("fofoIds", Arrays.asList(fofoId, 0));
        typedQuery.setParameter("onDate", onDate.atStartOfDay());
        typedQuery.setParameter("partnerTypes", Arrays.asList(partnerType, partnerType.ALL));
        typedQuery.setFirstResult(offset);
        if (limit != 0) {
            typedQuery.setMaxResults(limit);
        }
        return typedQuery.getResultList();
    }

    @Override
    public void processSchemeIn(List<InventoryItem> inventoryItems) throws ProfitMandiBusinessException {
        Map<Integer, List<InventoryItem>> purchaseIdInventoryItemsMap = inventoryItems.stream().collect(Collectors.groupingBy(x -> x.getPurchaseId()));
        for (Map.Entry<Integer, List<InventoryItem>> purchaseIdInventoryItemEntry : purchaseIdInventoryItemsMap.entrySet()) {
            int retailerId = purchaseIdInventoryItemEntry.getValue().get(0).getFofoId();
            this.processSchemeIn(purchaseIdInventoryItemEntry.getKey(), retailerId);
        }
    }

    @Autowired
    FofoStoreRepository fofoStoreRepository;


    @Override
    @Cacheable(value = "staticscheme", cacheManager = "oneDayCacheManager")
    public Scheme getStaticScheme(int fofoId) throws ProfitMandiBusinessException {
        FofoStore fofoStore = fofoStoreRepository.selectByRetailerId(fofoId);
        Scheme scheme = null;
        if (fofoStore.getTarget() > 0) {
            scheme = new Scheme();
            scheme.setName("Super Retailer - Club 4");
            scheme.setStartDateTime(LocalDate.of(2025, 8, 1).atStartOfDay());
            scheme.setEndDateTime(LocalDate.of(2026, 01, 20).atTime(LocalTime.MAX));
            scheme.setTarget(fofoStore.getTarget());

            if (scheme.getEndDateTime().plusDays(5).isBefore(LocalDateTime.now())) return null;
        }
        return scheme;
    }

    @Override
    public List<Scheme> selectSchemeByPartnerType(PartnerType partnerType, LocalDate onDate, int catalogId,
                                                  boolean isAdmin, int offset, int limit) throws ProfitMandiBusinessException {
        Session session = sessionFactory.getCurrentSession();
        List<Predicate> andPredicates = new ArrayList<>();
        CriteriaBuilder cb = session.getCriteriaBuilder();
        CriteriaQuery<Scheme> query = cb.createQuery(Scheme.class);
        Root<Scheme> scheme = query.from(Scheme.class);
        if (!partnerType.equals(PartnerType.ALL)) {
            List<PartnerType> pt = new ArrayList<>();
            pt.add(PartnerType.ALL);
            pt.add(partnerType);
            andPredicates.add(cb.in(scheme.get("partnerType")).value(pt));
        }
        cb.desc(cb.isNull(scheme.get("expireTimestamp")));
        if (catalogId > 0) {

            List<Integer> schemeIds = schemeItemRepository.selectSchemeIdByCatalogId(catalogId);
            LOGGER.info("schemeId" + schemeIds);
            if (schemeIds.isEmpty()) {
                return new ArrayList<>();
            }
            andPredicates.add(cb.in(scheme.get("id")).value(schemeIds));
            if (onDate != null) {
                andPredicates.add(cb.greaterThan(scheme.get("endDateTime"), onDate.atStartOfDay()));
                andPredicates.add(cb.lessThanOrEqualTo(scheme.get("startDateTime"), onDate.atStartOfDay()));
            }
        }
        if (!isAdmin) {
            andPredicates.add(cb.isNotNull(scheme.get("activeTimestamp")));
        }
        query.where(cb.and(andPredicates.toArray(new Predicate[0])));
        query.orderBy(cb.desc(cb.function("isnull", Boolean.class, scheme.get("expireTimestamp"))));
        if (limit == 0) {
            return session.createQuery(query).setFirstResult(offset).getResultList();
        }
        return session.createQuery(query).setFirstResult(offset).setMaxResults(limit).getResultList();

    }

    @Override
    public List<Scheme> selectPercentageScheme(PartnerType partnerType, LocalDate onDate, int catalogId,
                                               boolean isAdmin, int offset, int limit) throws ProfitMandiBusinessException {
        List<Scheme> schemes = this.selectSchemeByPartnerType(partnerType, onDate, catalogId, isAdmin, offset, limit);
        return schemes.stream().filter(x -> x.getAmountType().equals(AmountType.PERCENTAGE)).collect(Collectors.toList());
    }

    @Override
    public void processActivation() throws ProfitMandiBusinessException {
        List<SchemeInOut> pendingPayouts = schemeInOutRepository.selectAllPending();
        List<Integer> schemeIds = new ArrayList<>();
        Set<Integer> inventoryIds = new HashSet<>();
        for (SchemeInOut pendingPayout : pendingPayouts) {
            schemeIds.add(pendingPayout.getSchemeId());
        }
        Map<Integer, Scheme> schemesMap = schemeRepository.selectBySchemeIds(schemeIds, 0, 0).stream()
                .filter(x -> x.getType().equals(SchemeType.SPECIAL_SUPPORT))
                .collect(Collectors.toMap(x -> x.getId(), x -> x));
        pendingPayouts = pendingPayouts.stream().filter(x -> schemesMap.get(x.getSchemeId()) != null)
                .collect(Collectors.toList());

        for (SchemeInOut pendingPayout : pendingPayouts) {
            inventoryIds.add(pendingPayout.getInventoryItemId());
        }
        Map<Integer, InventoryItem> inventoryItemMap = inventoryItemRepository.selectByIds(inventoryIds).stream()
                .collect(Collectors.toMap(x -> x.getId(), x -> x));
        Map<String, InventoryItem> serialNumberMap = inventoryItemMap.values().stream()
                .collect(Collectors.toMap(x -> x.getSerialNumber(), x -> x));

        List<ActivatedImei> activatedImeis = activatedImeiRepository
                .selectBySerialNumbers(new ArrayList<>(serialNumberMap.keySet())).stream().collect(Collectors.toList());

        Map<String, ActivatedImei> activatedImeiMap = activatedImeis.stream()
                .collect(Collectors.toMap(x -> x.getSerialNumber().toLowerCase(), x -> x));
        for (SchemeInOut pendingPayout : pendingPayouts) {
            Scheme scheme = schemesMap.get(pendingPayout.getSchemeId());
            InventoryItem ii = inventoryItemMap.get(pendingPayout.getInventoryItemId());
            String serialNumber = ii.getSerialNumber().toLowerCase();
            ActivatedImei activatedImei = activatedImeiMap.get(serialNumber);
            if (activatedImei == null) {
                continue;
            }
            if (scheme.isWithinRange(activatedImei.getActivationTimestamp())) {
                int fofoId = ii.getFofoId();
                // Get latest order Id
                int orderId = scanRecordRepository.selectByInventoryItemId(ii.getId()).stream()
                        .filter(x -> x.getOrderId() > 0)
                        .sorted(Comparator.comparing(ScanRecord::getCreateTimestamp).reversed()).findFirst().get()
                        .getOrderId();
                FofoOrder fofoOrder = fofoOrderRepository.selectByOrderId(orderId);

                InventoryPayoutModel inventoryPayoutModel = priceCircularService.getPayouts(ii);
                AmountModel amountModel = new AmountModel();
                amountModel.setAmount(scheme.getAmount());
                amountModel.setAmountType(scheme.getAmountType());
                double amountToRollout = inventoryPayoutModel.getRolloutAmount(amountModel);
                pendingPayout.setAmount((float) amountToRollout);
                walletService.addAmountToWallet(fofoId, orderId, WalletReferenceType.SPECIAL_SUPPORT,
                        "Special Support for " + ii.getItem().getItemDescriptionNoColor() + ", Imei - " + serialNumber, (float) amountToRollout,
                        fofoOrder.getCreateTimestamp());
                pendingPayout.setStatusDescription("Special support credited, activated on " + FormattingUtils.formatDate(activatedImei.getActivationTimestamp()));

                pendingPayout.setCreditTimestamp(LocalDateTime.now());
                pendingPayout.setStatus(SchemePayoutStatus.CREDITED);
            } else {
                pendingPayout.setStatus(SchemePayoutStatus.REJECTED);
                pendingPayout.setRolledBackTimestamp(LocalDateTime.now());
                pendingPayout.setStatusDescription(
                        "Rejected, activated on " + FormattingUtils.formatDate(activatedImei.getActivationTimestamp()));
            }
        }
    }

    @Override
    public void rejectActivatedSchemeIds(List<Integer> schemeIds) throws ProfitMandiBusinessException {
        List<SchemeInOut> schemeInOuts = schemeInOutRepository.selectBySchemeIds(new HashSet<>(schemeIds));
        // Filter to CREDITED records that haven't been rolled back
        List<SchemeInOut> creditedPayouts = schemeInOuts.stream()
                .filter(x -> SchemePayoutStatus.CREDITED.equals(x.getStatus()) && x.getRolledBackTimestamp() == null)
                .collect(Collectors.toList());
        if (creditedPayouts.isEmpty()) return;

        Map<Integer, Scheme> schemesMap = schemeRepository.selectBySchemeIds(schemeIds).stream()
                .filter(x -> SchemeType.SPECIAL_SUPPORT.equals(x.getType()))
                .collect(Collectors.toMap(x -> x.getId(), x -> x));
        creditedPayouts = creditedPayouts.stream().filter(x -> schemesMap.containsKey(x.getSchemeId()))
                .collect(Collectors.toList());
        if (creditedPayouts.isEmpty()) return;

        Set<Integer> inventoryIds = creditedPayouts.stream().map(SchemeInOut::getInventoryItemId).collect(Collectors.toSet());
        Map<Integer, InventoryItem> inventoryItemMap = inventoryItemRepository.selectByIds(inventoryIds).stream()
                .collect(Collectors.toMap(x -> x.getId(), x -> x));
        List<String> serialNumbers = inventoryItemMap.values().stream().map(InventoryItem::getSerialNumber).collect(Collectors.toList());
        Map<String, ActivatedImei> activatedImeiMap = activatedImeiRepository.selectBySerialNumbers(serialNumbers).stream()
                .collect(Collectors.toMap(x -> x.getSerialNumber().toLowerCase(), x -> x, (a, b) -> a));

        for (SchemeInOut payout : creditedPayouts) {
            Scheme scheme = schemesMap.get(payout.getSchemeId());
            InventoryItem ii = inventoryItemMap.get(payout.getInventoryItemId());
            if (ii == null) continue;
            ActivatedImei activatedImei = activatedImeiMap.get(ii.getSerialNumber().toLowerCase());
            if (activatedImei == null || activatedImei.getActivationTimestamp() == null) continue;

            if (!scheme.isWithinRange(activatedImei.getActivationTimestamp())) {
                payout.setStatus(SchemePayoutStatus.REJECTED);
                payout.setRolledBackTimestamp(LocalDateTime.now());
                payout.setStatusDescription("Rejected, activation on " + FormattingUtils.formatDate(activatedImei.getActivationTimestamp()) + " outside scheme period");

                List<ScanRecord> scanRecords = scanRecordRepository.selectByInventoryItemId(ii.getId());
                int orderId = scanRecords.stream()
                        .filter(x -> x.getOrderId() > 0)
                        .sorted(Comparator.comparing(ScanRecord::getCreateTimestamp).reversed()).findFirst()
                        .map(ScanRecord::getOrderId).orElse(0);
                if (orderId > 0) {
                    walletService.rollbackAmountFromWallet(ii.getFofoId(), payout.getAmount(), orderId,
                            WalletReferenceType.SPECIAL_SUPPORT,
                            "Rejected, activation on " + FormattingUtils.formatDate(activatedImei.getActivationTimestamp()) + " outside scheme period",
                            LocalDateTime.now());
                }
            }
        }
    }

    @Override
    public void processActivatedImeisForSchemes() throws ProfitMandiBusinessException {
        List<SchemesImeisModel> schemesImeisModels = schemeRepository.selectSelectUnpaidSchemes();
        LOGGER.info("Total Size - " + schemesImeisModels.size());
        List<Integer> orderIds = schemesImeisModels.stream().map(x -> x.getOrderId()).collect(Collectors.toList());
        List<FofoOrder> fofoOrders = fofoOrderRepository.selectAllByOrderIds(orderIds);
        Map<Integer, FofoOrder> validOrdersMap = fofoOrders.stream().filter(x -> x.getCancelledTimestamp() == null).collect(Collectors.toMap(x -> x.getId(), x -> x));
        Map<String, List<SchemesImeisModel>> validImeiSchemesModelMap = schemesImeisModels.stream().filter(x -> validOrdersMap.containsKey(x.getOrderId())).collect(Collectors.groupingBy(x -> x.getImei()));

        // Batch fetch all inventoryItems to avoid N+1 queries
        Set<Integer> inventoryItemIds = validImeiSchemesModelMap.values().stream()
                .map(list -> list.get(0).getInventoryItemId())
                .collect(Collectors.toSet());
        Map<Integer, InventoryItem> inventoryItemsMap = inventoryItemIds.isEmpty()
                ? Collections.emptyMap()
                : inventoryItemRepository.selectAllByIds(new ArrayList<>(inventoryItemIds)).stream()
                        .collect(Collectors.toMap(InventoryItem::getId, x -> x));

        for (Map.Entry<String, List<SchemesImeisModel>> imeiListEntry : validImeiSchemesModelMap.entrySet()) {
            SchemesImeisModel schemesImeisModel = imeiListEntry.getValue().get(0);
            List<Integer> schemeIds = imeiListEntry.getValue().stream().map(x -> x.getSchemeId()).collect(Collectors.toList());
            LOGGER.info("Serial Number  - {}, Scheme IDs - {}", schemesImeisModel.getImei(), schemeIds);
            InventoryItem inventoryItem = inventoryItemsMap.get(schemesImeisModel.getInventoryItemId());
            if (inventoryItem == null) continue;
            List<Scheme> schemes = schemeRepository.selectBySchemeIds(schemeIds);
            List<Scheme> supportSchemes = schemes.stream().filter(x -> Arrays.asList(SchemeType.SPECIAL_SUPPORT).contains(x.getType())).collect(Collectors.toList());
            if (supportSchemes.size() > 0) {
                for (Scheme scheme : supportSchemes) {
                    List<SchemeInOut> schemeInOuts = schemeInOutRepository.selectByScheme(scheme.getId(), inventoryItem.getId());
                    if (schemeInOuts.stream().filter(x -> Arrays.asList(SchemePayoutStatus.CREDITED, SchemePayoutStatus.PENDING).contains(x.getStatus())).count() > 0) {
                        continue;
                    }
                    SchemeInOut sio = new SchemeInOut();
                    sio.setAmount(0);
                    sio.setInventoryItemId(inventoryItem.getId());
                    sio.setSchemeId(scheme.getId());
                    sio.setStatusDescription("Activation pending for IMEI#" + inventoryItem.getSerialNumber());
                    sio.setStatus(SchemePayoutStatus.PENDING);
                    schemeInOutRepository.persist(sio);
                }
            }
        }
    }
}