Subversion Repositories SmartDukaan

Rev

Rev 35250 | Blame | Compare with Previous | Last modification | View Log | RSS feed

package com.smartdukaan.cron.scheduled;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.hibernate.Session;
import org.hibernate.SessionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import javax.persistence.Query;
import java.math.BigInteger;

@Component
@Transactional(rollbackFor = Throwable.class)
public class Migrations {

    private static final Logger LOGGER = LogManager.getLogger(Migrations.class);

    @Autowired
    private SessionFactory sessionFactory;

    public void migrateWarehouseOriginalInventoryItemId() {
        migrateWarehouseOriginalInventoryItemId(10_000);
    }

    public void migrateWarehouseOriginalInventoryItemId(int batchSize) {
        Session session = sessionFactory.getCurrentSession();

        ensureColumnExists(session);
        ensureIndexExists(session);

        int totalUpdated = 1;
        while (true) {
            // Build the mapping subquery limited to batchSize
            String mappingCte =
                    "SELECT wi.id AS target_id, (" +
                    "  SELECT orig.id FROM warehouse.inventoryItem orig " +
                    "  JOIN warehouse.purchase p2 ON p2.id = orig.purchaseId " +
                    "  JOIN warehouse.purchaseorder po2 ON po2.id = p2.purchaseOrder_id " +
                    "  JOIN warehouse.supplier s2 ON s2.id = po2.supplierId " +
                    "  WHERE orig.serialNumber = wi.serialNumber " +
                    "    AND orig.itemId = wi.itemId " +
                    "    AND orig.id <> wi.id " +
                    "  ORDER BY s2.internal ASC, orig.created ASC, orig.id ASC LIMIT 1" +
                    ") AS original_id " +
                    "FROM warehouse.inventoryItem wi " +
                    "JOIN warehouse.purchase p ON p.id = wi.purchaseId " +
                    "JOIN warehouse.purchaseorder po ON po.id = p.purchaseOrder_id " +
                    "JOIN warehouse.supplier s ON s.id = po.supplierId " +
                    "WHERE s.internal = 1 AND wi.originalInventoryItemId IS NULL " +
                    "  AND wi.serialNumber IS NOT NULL AND LENGTH(wi.serialNumber) > 0 " +
                    "LIMIT :batchSize";

            String updateSql =
                    "UPDATE warehouse.inventoryItem wi " +
                    "JOIN (" + mappingCte + ") m ON wi.id = m.target_id " +
                    "SET wi.originalInventoryItemId = m.original_id " +
                    "WHERE m.original_id IS NOT NULL";

            Query q = session.createNativeQuery(updateSql);
            q.setParameter("batchSize", batchSize);
            int updated = q.executeUpdate();
            totalUpdated += updated;

            LOGGER.info("Backfill batch updated rows: {} (total so far: {})", updated, totalUpdated);

            // If fewer than batchSize were eligible or none updated, we may still have NULL mappings; check pending count.
            long remaining = countPending(session);
            LOGGER.info("Remaining internal inventory rows without originalInventoryItemId: {}", remaining);
            if (updated == 0 || remaining == 0) {
                break;
            }
        }
        LOGGER.info("Backfill complete. Total rows updated: {}", totalUpdated);
    }

    private long countPending(Session session) {
        String sql = "SELECT COUNT(*) FROM warehouse.inventoryItem wi " +
                "JOIN warehouse.purchase p ON p.id = wi.purchaseId " +
                "JOIN warehouse.purchaseorder po ON po.id = p.purchaseOrder_id " +
                "JOIN warehouse.supplier s ON s.id = po.supplierId " +
                "WHERE s.internal = 1 AND wi.originalInventoryItemId IS NULL " +
                "  AND wi.serialNumber IS NOT NULL AND LENGTH(wi.serialNumber) > 0";
        Object res = session.createNativeQuery(sql).getSingleResult();
        if (res instanceof BigInteger) return ((BigInteger) res).longValue();
        if (res instanceof Number) return ((Number) res).longValue();
        return Long.parseLong(String.valueOf(res));
    }

    private void ensureColumnExists(Session session) {
        String checkSql = "SELECT COUNT(*) FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = 'warehouse' " +
                "AND TABLE_NAME = 'inventoryItem' AND COLUMN_NAME = 'originalInventoryItemId'";
        Object res = session.createNativeQuery(checkSql).getSingleResult();
        long cnt = (res instanceof BigInteger) ? ((BigInteger) res).longValue() : Long.parseLong(String.valueOf(res));
        if (cnt == 0) {
            LOGGER.info("Adding column warehouse.inventoryItem.originalInventoryItemId ...");
            session.createNativeQuery("ALTER TABLE warehouse.inventoryItem ADD COLUMN originalInventoryItemId INT NULL").executeUpdate();
            LOGGER.info("Column added.");
        } else {
            LOGGER.info("Column originalInventoryItemId already exists. Skipping add.");
        }
    }

    private void ensureIndexExists(Session session) {
        String checkIdx = "SELECT COUNT(1) FROM information_schema.STATISTICS WHERE TABLE_SCHEMA='warehouse' " +
                "AND TABLE_NAME='inventoryItem' AND INDEX_NAME='idx_inventoryItem_originalInventoryItemId'";
        Object res = session.createNativeQuery(checkIdx).getSingleResult();
        long cnt = (res instanceof BigInteger) ? ((BigInteger) res).longValue() : Long.parseLong(String.valueOf(res));
        if (cnt == 0) {
            LOGGER.info("Creating index idx_inventoryItem_originalInventoryItemId ...");
            session.createNativeQuery("CREATE INDEX idx_inventoryItem_originalInventoryItemId ON warehouse.inventoryItem(originalInventoryItemId)").executeUpdate();
            LOGGER.info("Index created.");
        } else {
            LOGGER.info("Index idx_inventoryItem_originalInventoryItemId already exists. Skipping create.");
        }

        // Helpful index on serialNumber for performance during backfill
        String checkSerialIdx = "SELECT COUNT(1) FROM information_schema.STATISTICS WHERE TABLE_SCHEMA='warehouse' " +
                "AND TABLE_NAME='inventoryItem' AND INDEX_NAME='idx_inventoryItem_serialNumber'";
        Object res2 = session.createNativeQuery(checkSerialIdx).getSingleResult();
        long cnt2 = (res2 instanceof BigInteger) ? ((BigInteger) res2).longValue() : Long.parseLong(String.valueOf(res2));
        if (cnt2 == 0) {
            LOGGER.info("Creating index idx_inventoryItem_serialNumber ...");
            session.createNativeQuery("CREATE INDEX idx_inventoryItem_serialNumber ON warehouse.inventoryItem(serialNumber)").executeUpdate();
            LOGGER.info("Index created.");
        } else {
            LOGGER.info("Index idx_inventoryItem_serialNumber already exists. Skipping create.");
        }

        // Composite index to speed up lookup by (itemId, serialNumber)
        String checkItemSerialIdx = "SELECT COUNT(1) FROM information_schema.STATISTICS WHERE TABLE_SCHEMA='warehouse' " +
                "AND TABLE_NAME='inventoryItem' AND INDEX_NAME='idx_inventoryItem_itemId_serialNumber'";
        Object res3 = session.createNativeQuery(checkItemSerialIdx).getSingleResult();
        long cnt3 = (res3 instanceof BigInteger) ? ((BigInteger) res3).longValue() : Long.parseLong(String.valueOf(res3));
        if (cnt3 == 0) {
            LOGGER.info("Creating index idx_inventoryItem_itemId_serialNumber ...");
            session.createNativeQuery("CREATE INDEX idx_inventoryItem_itemId_serialNumber ON warehouse.inventoryItem(itemId, serialNumber)").executeUpdate();
            LOGGER.info("Index created.");
        } else {
            LOGGER.info("Index idx_inventoryItem_itemId_serialNumber already exists. Skipping create.");
        }
    }
}