Subversion Repositories SmartDukaan

Rev

Rev 35250 | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
35250 amit 1
package com.smartdukaan.cron.scheduled;
2
 
3
import org.apache.logging.log4j.LogManager;
4
import org.apache.logging.log4j.Logger;
5
import org.hibernate.Session;
6
import org.hibernate.SessionFactory;
7
import org.springframework.beans.factory.annotation.Autowired;
8
import org.springframework.stereotype.Component;
9
import org.springframework.transaction.annotation.Transactional;
10
 
11
import javax.persistence.Query;
12
import java.math.BigInteger;
13
 
14
@Component
15
@Transactional(rollbackFor = Throwable.class)
16
public class Migrations {
17
 
18
    private static final Logger LOGGER = LogManager.getLogger(Migrations.class);
19
 
20
    @Autowired
21
    private SessionFactory sessionFactory;
22
 
23
    public void migrateWarehouseOriginalInventoryItemId() {
24
        migrateWarehouseOriginalInventoryItemId(10_000);
25
    }
26
 
27
    public void migrateWarehouseOriginalInventoryItemId(int batchSize) {
28
        Session session = sessionFactory.getCurrentSession();
29
 
30
        ensureColumnExists(session);
31
        ensureIndexExists(session);
32
 
35265 amit 33
        int totalUpdated = 1;
35250 amit 34
        while (true) {
35
            // Build the mapping subquery limited to batchSize
36
            String mappingCte =
37
                    "SELECT wi.id AS target_id, (" +
38
                    "  SELECT orig.id FROM warehouse.inventoryItem orig " +
39
                    "  JOIN warehouse.purchase p2 ON p2.id = orig.purchaseId " +
40
                    "  JOIN warehouse.purchaseorder po2 ON po2.id = p2.purchaseOrder_id " +
41
                    "  JOIN warehouse.supplier s2 ON s2.id = po2.supplierId " +
42
                    "  WHERE orig.serialNumber = wi.serialNumber " +
43
                    "    AND orig.itemId = wi.itemId " +
44
                    "    AND orig.id <> wi.id " +
45
                    "  ORDER BY s2.internal ASC, orig.created ASC, orig.id ASC LIMIT 1" +
46
                    ") AS original_id " +
47
                    "FROM warehouse.inventoryItem wi " +
48
                    "JOIN warehouse.purchase p ON p.id = wi.purchaseId " +
49
                    "JOIN warehouse.purchaseorder po ON po.id = p.purchaseOrder_id " +
50
                    "JOIN warehouse.supplier s ON s.id = po.supplierId " +
51
                    "WHERE s.internal = 1 AND wi.originalInventoryItemId IS NULL " +
52
                    "  AND wi.serialNumber IS NOT NULL AND LENGTH(wi.serialNumber) > 0 " +
53
                    "LIMIT :batchSize";
54
 
55
            String updateSql =
56
                    "UPDATE warehouse.inventoryItem wi " +
57
                    "JOIN (" + mappingCte + ") m ON wi.id = m.target_id " +
58
                    "SET wi.originalInventoryItemId = m.original_id " +
59
                    "WHERE m.original_id IS NOT NULL";
60
 
61
            Query q = session.createNativeQuery(updateSql);
62
            q.setParameter("batchSize", batchSize);
63
            int updated = q.executeUpdate();
64
            totalUpdated += updated;
65
 
66
            LOGGER.info("Backfill batch updated rows: {} (total so far: {})", updated, totalUpdated);
67
 
68
            // If fewer than batchSize were eligible or none updated, we may still have NULL mappings; check pending count.
69
            long remaining = countPending(session);
70
            LOGGER.info("Remaining internal inventory rows without originalInventoryItemId: {}", remaining);
71
            if (updated == 0 || remaining == 0) {
72
                break;
73
            }
74
        }
75
        LOGGER.info("Backfill complete. Total rows updated: {}", totalUpdated);
76
    }
77
 
78
    private long countPending(Session session) {
79
        String sql = "SELECT COUNT(*) FROM warehouse.inventoryItem wi " +
80
                "JOIN warehouse.purchase p ON p.id = wi.purchaseId " +
81
                "JOIN warehouse.purchaseorder po ON po.id = p.purchaseOrder_id " +
82
                "JOIN warehouse.supplier s ON s.id = po.supplierId " +
83
                "WHERE s.internal = 1 AND wi.originalInventoryItemId IS NULL " +
84
                "  AND wi.serialNumber IS NOT NULL AND LENGTH(wi.serialNumber) > 0";
85
        Object res = session.createNativeQuery(sql).getSingleResult();
86
        if (res instanceof BigInteger) return ((BigInteger) res).longValue();
87
        if (res instanceof Number) return ((Number) res).longValue();
88
        return Long.parseLong(String.valueOf(res));
89
    }
90
 
91
    private void ensureColumnExists(Session session) {
92
        String checkSql = "SELECT COUNT(*) FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = 'warehouse' " +
93
                "AND TABLE_NAME = 'inventoryItem' AND COLUMN_NAME = 'originalInventoryItemId'";
94
        Object res = session.createNativeQuery(checkSql).getSingleResult();
95
        long cnt = (res instanceof BigInteger) ? ((BigInteger) res).longValue() : Long.parseLong(String.valueOf(res));
96
        if (cnt == 0) {
97
            LOGGER.info("Adding column warehouse.inventoryItem.originalInventoryItemId ...");
98
            session.createNativeQuery("ALTER TABLE warehouse.inventoryItem ADD COLUMN originalInventoryItemId INT NULL").executeUpdate();
99
            LOGGER.info("Column added.");
100
        } else {
101
            LOGGER.info("Column originalInventoryItemId already exists. Skipping add.");
102
        }
103
    }
104
 
105
    private void ensureIndexExists(Session session) {
106
        String checkIdx = "SELECT COUNT(1) FROM information_schema.STATISTICS WHERE TABLE_SCHEMA='warehouse' " +
107
                "AND TABLE_NAME='inventoryItem' AND INDEX_NAME='idx_inventoryItem_originalInventoryItemId'";
108
        Object res = session.createNativeQuery(checkIdx).getSingleResult();
109
        long cnt = (res instanceof BigInteger) ? ((BigInteger) res).longValue() : Long.parseLong(String.valueOf(res));
110
        if (cnt == 0) {
111
            LOGGER.info("Creating index idx_inventoryItem_originalInventoryItemId ...");
112
            session.createNativeQuery("CREATE INDEX idx_inventoryItem_originalInventoryItemId ON warehouse.inventoryItem(originalInventoryItemId)").executeUpdate();
113
            LOGGER.info("Index created.");
114
        } else {
115
            LOGGER.info("Index idx_inventoryItem_originalInventoryItemId already exists. Skipping create.");
116
        }
117
 
118
        // Helpful index on serialNumber for performance during backfill
119
        String checkSerialIdx = "SELECT COUNT(1) FROM information_schema.STATISTICS WHERE TABLE_SCHEMA='warehouse' " +
120
                "AND TABLE_NAME='inventoryItem' AND INDEX_NAME='idx_inventoryItem_serialNumber'";
121
        Object res2 = session.createNativeQuery(checkSerialIdx).getSingleResult();
122
        long cnt2 = (res2 instanceof BigInteger) ? ((BigInteger) res2).longValue() : Long.parseLong(String.valueOf(res2));
123
        if (cnt2 == 0) {
124
            LOGGER.info("Creating index idx_inventoryItem_serialNumber ...");
125
            session.createNativeQuery("CREATE INDEX idx_inventoryItem_serialNumber ON warehouse.inventoryItem(serialNumber)").executeUpdate();
126
            LOGGER.info("Index created.");
127
        } else {
128
            LOGGER.info("Index idx_inventoryItem_serialNumber already exists. Skipping create.");
129
        }
130
 
131
        // Composite index to speed up lookup by (itemId, serialNumber)
132
        String checkItemSerialIdx = "SELECT COUNT(1) FROM information_schema.STATISTICS WHERE TABLE_SCHEMA='warehouse' " +
133
                "AND TABLE_NAME='inventoryItem' AND INDEX_NAME='idx_inventoryItem_itemId_serialNumber'";
134
        Object res3 = session.createNativeQuery(checkItemSerialIdx).getSingleResult();
135
        long cnt3 = (res3 instanceof BigInteger) ? ((BigInteger) res3).longValue() : Long.parseLong(String.valueOf(res3));
136
        if (cnt3 == 0) {
137
            LOGGER.info("Creating index idx_inventoryItem_itemId_serialNumber ...");
138
            session.createNativeQuery("CREATE INDEX idx_inventoryItem_itemId_serialNumber ON warehouse.inventoryItem(itemId, serialNumber)").executeUpdate();
139
            LOGGER.info("Index created.");
140
        } else {
141
            LOGGER.info("Index idx_inventoryItem_itemId_serialNumber already exists. Skipping create.");
142
        }
143
    }
144
}