Subversion Repositories SmartDukaan

Rev

Rev 36357 | Blame | Compare with Previous | Last modification | View Log | RSS feed

package com.spice.profitmandi.service.offers;

import com.spice.profitmandi.dao.entity.transaction.CronBatch;
import com.spice.profitmandi.dao.enumuration.catalog.OfferSchemeType;
import com.spice.profitmandi.dao.model.CreateOfferRequest;
import com.spice.profitmandi.service.cron.CronBatchService;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.LinkedHashMap;
import java.util.List;
import java.util.stream.Collectors;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * Orchestrates offer processing with per-partner transaction isolation and batch tracking.
 *
 * NO @Transactional on this class or any of its methods — the orchestrator must not
 * carry an outer transaction. Each partner's write work runs in REQUIRES_NEW via
 * OfferProcessingHelper; batch metadata writes run in REQUIRES_NEW via CronBatchService.
 * If the caller is already inside a transaction (e.g., controller class-level @Transactional),
 * REQUIRES_NEW will suspend it cleanly.
 */
@Service
public class OfferBatchService {

    private static final Logger LOGGER = LogManager.getLogger(OfferBatchService.class);

    private static final ExecutorService BATCH_EXECUTOR = Executors.newFixedThreadPool(3, r -> {
        Thread t = new Thread(r, "offer-batch-worker");
        t.setDaemon(true);
        return t;
    });

    private final Set<Integer> inFlightOfferIds = ConcurrentHashMap.newKeySet();

    @Autowired
    private OfferService offerService;

    @Autowired
    private OfferProcessingHelper offerProcessingHelper;

    @Autowired
    private CronBatchService cronBatchService;

    public boolean hasUnfinishedBatch(int offerId) {
        return cronBatchService.findRunningForOffer(offerId) != null;
    }

    /**
     * Fire-and-forget: schedule the batch on a background worker and return immediately.
     * Prevents duplicate concurrent runs for the same offerId at the JVM level.
     * Used by HTTP controllers; cron CLI keeps using the sync processOfferWithBatch.
     */
    public String submitBatchAsync(int offerId) {
        if (!inFlightOfferIds.add(offerId)) {
            LOGGER.info("Offer {} batch submit ignored — already in flight", offerId);
            return "Offer " + offerId + " is already being processed. Check the batch summary for progress.";
        }
        CronBatch existing = cronBatchService.findRunningForOffer(offerId);
        if (existing != null) {
            inFlightOfferIds.remove(offerId);
            LOGGER.info("Offer {} has unfinished batch {} (started {}); blocking reprocess",
                    offerId, existing.getId(), existing.getStartedAt());
            return "Offer " + offerId + " has an unfinished batch. Reprocessing is not allowed until the existing batch is fully processed.";
        }
        BATCH_EXECUTOR.submit(() -> {
            try {
                processOfferWithBatch(offerId);
            } catch (Exception e) {
                LOGGER.error("Offer {} batch processing failed: {}", offerId, e.getMessage(), e);
            } finally {
                inFlightOfferIds.remove(offerId);
            }
        });
        LOGGER.info("Offer {} batch submitted to background worker", offerId);
        return "Offer " + offerId + " submitted for processing. Check the batch summary for progress.";
    }

    public void processOfferWithBatch(int offerId) throws Exception {
        CronBatch existing = cronBatchService.findRunningForOffer(offerId);
        if (existing != null) {
            LOGGER.info("Offer {} already running (batch {}, started {}); skipping duplicate run",
                    offerId, existing.getId(), existing.getStartedAt());
            return;
        }

        CreateOfferRequest createOfferRequest = offerProcessingHelper.loadOfferRequest(offerId);
        if (createOfferRequest == null) {
            return;
        }
        LOGGER.info("Processing offer {} (type={}) with batch tracking", offerId, createOfferRequest.getSchemeType());

        if (createOfferRequest.getSchemeType().equals(OfferSchemeType.ACTIVATION)) {
            processActivationOfferWithBatch(offerId, createOfferRequest);
        } else if (createOfferRequest.getSchemeType().equals(OfferSchemeType.SELLIN)) {
            processSellinOfferWithBatch(offerId, createOfferRequest);
        } else {
            LOGGER.warn("Offer {} has unsupported scheme type: {}", offerId, createOfferRequest.getSchemeType());
        }
    }

    private void processActivationOfferWithBatch(int offerId, CreateOfferRequest createOfferRequest) throws Exception {
        List<OfferPartnerPayoutData> partnerPayouts;
        try {
            partnerPayouts = offerService.calculateOfferPayouts(createOfferRequest);
        } catch (Exception e) {
            LOGGER.error("Failed to calculate activation payouts for offer {}: {}", offerId, e.getMessage());
            return;
        }

        if (partnerPayouts.isEmpty()) {
            LOGGER.info("No eligible partners for activation offer {}", offerId);
            return;
        }

        int beforeFilter = partnerPayouts.size();
        partnerPayouts = partnerPayouts.stream()
                .filter(offerProcessingHelper::hasRemainingOfferPayout)
                .collect(Collectors.toList());
        if (partnerPayouts.isEmpty()) {
            LOGGER.info("Activation offer {}: all {} eligible partners already fully paid, skipping batch", offerId, beforeFilter);
            return;
        }
        if (partnerPayouts.size() < beforeFilter) {
            LOGGER.info("Activation offer {}: filtered {} already-paid partners, {} remain", offerId, beforeFilter - partnerPayouts.size(), partnerPayouts.size());
        }

        LinkedHashMap<Integer, String> fofoIdPartnerNameMap = new LinkedHashMap<>();
        for (OfferPartnerPayoutData data : partnerPayouts) {
            fofoIdPartnerNameMap.put(data.getFofoId(), "fofo-" + data.getFofoId());
        }

        CronBatch batch = cronBatchService.createBatch("processActivationOffer-" + offerId, fofoIdPartnerNameMap);

        for (OfferPartnerPayoutData data : partnerPayouts) {
            try {
                offerProcessingHelper.processPartnerOfferPayout(
                        batch.getId(), data.getFofoId(), offerId, createOfferRequest.getDescription(),
                        data.getLineItemValueMap(), data.getItemCriteriaPayout(), data.getCriteriaId(),
                        data.getSerialNumberPaid(), data.getAgeingSummaryModelsMap(),
                        data.getEligiblePayoutValue(), data.isDiscount());
            } catch (Exception e) {
                LOGGER.error("Activation offer {} failed for fofoId={}: {}", offerId, data.getFofoId(), e.getMessage());
                cronBatchService.markItemFailed(batch.getId(), data.getFofoId(), e.getMessage());
            }
        }

        cronBatchService.finalizeBatch(batch.getId());
    }

    private void processSellinOfferWithBatch(int offerId, CreateOfferRequest createOfferRequest) throws Exception {
        List<SellinPartnerPayoutData> partnerPayouts;
        try {
            partnerPayouts = offerService.calculateSellinPayouts(createOfferRequest);
        } catch (Exception e) {
            LOGGER.error("Failed to calculate sellin payouts for offer {}: {}", offerId, e.getMessage());
            return;
        }

        if (partnerPayouts.isEmpty()) {
            LOGGER.info("No eligible partners for sellin offer {}", offerId);
            return;
        }

        int beforeFilter = partnerPayouts.size();
        partnerPayouts = partnerPayouts.stream()
                .filter(offerProcessingHelper::hasRemainingSellinPayout)
                .collect(Collectors.toList());
        if (partnerPayouts.isEmpty()) {
            LOGGER.info("Sellin offer {}: all {} eligible partners already fully paid, skipping batch", offerId, beforeFilter);
            return;
        }
        if (partnerPayouts.size() < beforeFilter) {
            LOGGER.info("Sellin offer {}: filtered {} already-paid partners, {} remain", offerId, beforeFilter - partnerPayouts.size(), partnerPayouts.size());
        }

        LinkedHashMap<Integer, String> fofoIdPartnerNameMap = new LinkedHashMap<>();
        for (SellinPartnerPayoutData data : partnerPayouts) {
            fofoIdPartnerNameMap.put(data.getFofoId(), "fofo-" + data.getFofoId());
        }

        CronBatch batch = cronBatchService.createBatch("processSellinOffer-" + offerId, fofoIdPartnerNameMap);

        for (SellinPartnerPayoutData data : partnerPayouts) {
            try {
                offerProcessingHelper.processPartnerSellinPayout(
                        batch.getId(), data.getFofoId(), offerId, createOfferRequest.getDescription(),
                        data.getSerialNumberInventoryItemMap(), data.getItemCriteriaPayout(), data.getCriteriaId(),
                        data.getSerialNumberPaid(), data.getEligiblePayoutValue(), data.isDiscount());
            } catch (Exception e) {
                LOGGER.error("Sellin offer {} failed for fofoId={}: {}", offerId, data.getFofoId(), e.getMessage());
                cronBatchService.markItemFailed(batch.getId(), data.getFofoId(), e.getMessage());
            }
        }

        cronBatchService.finalizeBatch(batch.getId());
    }
}