Rev 36357 | Blame | Compare with Previous | Last modification | View Log | RSS feed
package com.spice.profitmandi.service.offers;import com.spice.profitmandi.dao.entity.transaction.CronBatch;import com.spice.profitmandi.dao.enumuration.catalog.OfferSchemeType;import com.spice.profitmandi.dao.model.CreateOfferRequest;import com.spice.profitmandi.service.cron.CronBatchService;import org.apache.logging.log4j.LogManager;import org.apache.logging.log4j.Logger;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;import java.util.LinkedHashMap;import java.util.List;import java.util.stream.Collectors;import java.util.Set;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;/*** Orchestrates offer processing with per-partner transaction isolation and batch tracking.** NO @Transactional on this class or any of its methods — the orchestrator must not* carry an outer transaction. Each partner's write work runs in REQUIRES_NEW via* OfferProcessingHelper; batch metadata writes run in REQUIRES_NEW via CronBatchService.* If the caller is already inside a transaction (e.g., controller class-level @Transactional),* REQUIRES_NEW will suspend it cleanly.*/@Servicepublic class OfferBatchService {private static final Logger LOGGER = LogManager.getLogger(OfferBatchService.class);private static final ExecutorService BATCH_EXECUTOR = Executors.newFixedThreadPool(3, r -> {Thread t = new Thread(r, "offer-batch-worker");t.setDaemon(true);return t;});private final Set<Integer> inFlightOfferIds = ConcurrentHashMap.newKeySet();@Autowiredprivate OfferService offerService;@Autowiredprivate OfferProcessingHelper offerProcessingHelper;@Autowiredprivate CronBatchService cronBatchService;public boolean hasUnfinishedBatch(int offerId) {return cronBatchService.findRunningForOffer(offerId) != null;}/*** Fire-and-forget: schedule the batch on a background worker and return immediately.* Prevents duplicate concurrent runs for the same offerId at the JVM level.* Used by HTTP controllers; cron CLI keeps using the sync processOfferWithBatch.*/public String submitBatchAsync(int offerId) {if (!inFlightOfferIds.add(offerId)) {LOGGER.info("Offer {} batch submit ignored — already in flight", offerId);return "Offer " + offerId + " is already being processed. Check the batch summary for progress.";}CronBatch existing = cronBatchService.findRunningForOffer(offerId);if (existing != null) {inFlightOfferIds.remove(offerId);LOGGER.info("Offer {} has unfinished batch {} (started {}); blocking reprocess",offerId, existing.getId(), existing.getStartedAt());return "Offer " + offerId + " has an unfinished batch. Reprocessing is not allowed until the existing batch is fully processed.";}BATCH_EXECUTOR.submit(() -> {try {processOfferWithBatch(offerId);} catch (Exception e) {LOGGER.error("Offer {} batch processing failed: {}", offerId, e.getMessage(), e);} finally {inFlightOfferIds.remove(offerId);}});LOGGER.info("Offer {} batch submitted to background worker", offerId);return "Offer " + offerId + " submitted for processing. Check the batch summary for progress.";}public void processOfferWithBatch(int offerId) throws Exception {CronBatch existing = cronBatchService.findRunningForOffer(offerId);if (existing != null) {LOGGER.info("Offer {} already running (batch {}, started {}); skipping duplicate run",offerId, existing.getId(), existing.getStartedAt());return;}CreateOfferRequest createOfferRequest = offerProcessingHelper.loadOfferRequest(offerId);if (createOfferRequest == null) {return;}LOGGER.info("Processing offer {} (type={}) with batch tracking", offerId, createOfferRequest.getSchemeType());if (createOfferRequest.getSchemeType().equals(OfferSchemeType.ACTIVATION)) {processActivationOfferWithBatch(offerId, createOfferRequest);} else if (createOfferRequest.getSchemeType().equals(OfferSchemeType.SELLIN)) {processSellinOfferWithBatch(offerId, createOfferRequest);} else {LOGGER.warn("Offer {} has unsupported scheme type: {}", offerId, createOfferRequest.getSchemeType());}}private void processActivationOfferWithBatch(int offerId, CreateOfferRequest createOfferRequest) throws Exception {List<OfferPartnerPayoutData> partnerPayouts;try {partnerPayouts = offerService.calculateOfferPayouts(createOfferRequest);} catch (Exception e) {LOGGER.error("Failed to calculate activation payouts for offer {}: {}", offerId, e.getMessage());return;}if (partnerPayouts.isEmpty()) {LOGGER.info("No eligible partners for activation offer {}", offerId);return;}int beforeFilter = partnerPayouts.size();partnerPayouts = partnerPayouts.stream().filter(offerProcessingHelper::hasRemainingOfferPayout).collect(Collectors.toList());if (partnerPayouts.isEmpty()) {LOGGER.info("Activation offer {}: all {} eligible partners already fully paid, skipping batch", offerId, beforeFilter);return;}if (partnerPayouts.size() < beforeFilter) {LOGGER.info("Activation offer {}: filtered {} already-paid partners, {} remain", offerId, beforeFilter - partnerPayouts.size(), partnerPayouts.size());}LinkedHashMap<Integer, String> fofoIdPartnerNameMap = new LinkedHashMap<>();for (OfferPartnerPayoutData data : partnerPayouts) {fofoIdPartnerNameMap.put(data.getFofoId(), "fofo-" + data.getFofoId());}CronBatch batch = cronBatchService.createBatch("processActivationOffer-" + offerId, fofoIdPartnerNameMap);for (OfferPartnerPayoutData data : partnerPayouts) {try {offerProcessingHelper.processPartnerOfferPayout(batch.getId(), data.getFofoId(), offerId, createOfferRequest.getDescription(),data.getLineItemValueMap(), data.getItemCriteriaPayout(), data.getCriteriaId(),data.getSerialNumberPaid(), data.getAgeingSummaryModelsMap(),data.getEligiblePayoutValue(), data.isDiscount());} catch (Exception e) {LOGGER.error("Activation offer {} failed for fofoId={}: {}", offerId, data.getFofoId(), e.getMessage());cronBatchService.markItemFailed(batch.getId(), data.getFofoId(), e.getMessage());}}cronBatchService.finalizeBatch(batch.getId());}private void processSellinOfferWithBatch(int offerId, CreateOfferRequest createOfferRequest) throws Exception {List<SellinPartnerPayoutData> partnerPayouts;try {partnerPayouts = offerService.calculateSellinPayouts(createOfferRequest);} catch (Exception e) {LOGGER.error("Failed to calculate sellin payouts for offer {}: {}", offerId, e.getMessage());return;}if (partnerPayouts.isEmpty()) {LOGGER.info("No eligible partners for sellin offer {}", offerId);return;}int beforeFilter = partnerPayouts.size();partnerPayouts = partnerPayouts.stream().filter(offerProcessingHelper::hasRemainingSellinPayout).collect(Collectors.toList());if (partnerPayouts.isEmpty()) {LOGGER.info("Sellin offer {}: all {} eligible partners already fully paid, skipping batch", offerId, beforeFilter);return;}if (partnerPayouts.size() < beforeFilter) {LOGGER.info("Sellin offer {}: filtered {} already-paid partners, {} remain", offerId, beforeFilter - partnerPayouts.size(), partnerPayouts.size());}LinkedHashMap<Integer, String> fofoIdPartnerNameMap = new LinkedHashMap<>();for (SellinPartnerPayoutData data : partnerPayouts) {fofoIdPartnerNameMap.put(data.getFofoId(), "fofo-" + data.getFofoId());}CronBatch batch = cronBatchService.createBatch("processSellinOffer-" + offerId, fofoIdPartnerNameMap);for (SellinPartnerPayoutData data : partnerPayouts) {try {offerProcessingHelper.processPartnerSellinPayout(batch.getId(), data.getFofoId(), offerId, createOfferRequest.getDescription(),data.getSerialNumberInventoryItemMap(), data.getItemCriteriaPayout(), data.getCriteriaId(),data.getSerialNumberPaid(), data.getEligiblePayoutValue(), data.isDiscount());} catch (Exception e) {LOGGER.error("Sellin offer {} failed for fofoId={}: {}", offerId, data.getFofoId(), e.getMessage());cronBatchService.markItemFailed(batch.getId(), data.getFofoId(), e.getMessage());}}cronBatchService.finalizeBatch(batch.getId());}}