Subversion Repositories SmartDukaan

Rev

Rev 3704 | Blame | Compare with Previous | Last modification | View Log | RSS feed

package in.shop2020.web;

import in.shop2020.model.Item;
import in.shop2020.model.ItemActivityWithSource;
import in.shop2020.server.ItemActivityWithSourceRepository;
import in.shop2020.server.ItemRepository;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TimeZone;
import java.util.logging.Logger;

import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import com.google.appengine.api.datastore.DatastoreService;
import com.google.appengine.api.datastore.DatastoreServiceFactory;
import com.google.appengine.api.datastore.Entity;
import com.google.appengine.api.datastore.FetchOptions;
import com.google.appengine.api.datastore.PreparedQuery;
import com.google.appengine.api.datastore.Query;

public class DailyProductWithSourceAggregatorServlet extends HttpServlet {
    private static final long serialVersionUID = -8236918415987438049L;
    private static Logger logger = Logger.getLogger(DailyProductWithSourceAggregatorServlet.class.getName());
    
    private static final String KIND = "DataLog";
    private static final String SESSION_ID_FIELD = "sessionId";
    private static final String ITEMID_FIELD = "itemId";
    private static final String ITEMIDS_FIELD = "itemIds";
    private static final String PRODUCTNAME_FIELD = "productName";
    private static final String EVENTTYPE_FIELD = "eventType";
    private static final String DATE_FIELD = "date";
    private static final String FIRST_SOURCE_FIELD = "firstSource";
    private static final String SESSION_SOURCE_FIELD = "source";
    
    private static final String PRODUCT_VIEW_EVENT = "PRODUCT_VIEW";
    private static final String ADD_TO_CART_EVENT = "ADD_TO_CART";
    private static final String DELETE_FROM_CART_EVENT = "DELETE_FROM_CART";
    private static final String RESEARCH_ADD_EVENT = "RESEARCH_ADD";
    private static final String RESEARCH_DELETE_EVENT = "RESEARCH_DELETE";
    private static final String SHIPPING_ACCESS_EVENT = "SHIPPINIG_ACCESS";
    private static final String PAYMENT_SUCCESS_EVENT = "PAYMENT_SUCCESS";
    private static final String ORDER_CREATION_EVENT = "ORDER_CREATION";
    private static final String PAYMENT_FAILURE_EVENT = "PAYMENT_FAILURE";
    private static final String PROCEED_TO_PAY_EVENT = "PROCEED_TO_PAY";
    private static final String NEW_SESSION_EVENT = "NEW_SESSION";
    
    private static final String VIEW_COUNT_KEY = "ViewCount";
    private static final String UNIQUE_VIEW_COUNT_KEY = "UniqueViewCount";
    private static final String ADD_TO_CART_COUNT_KEY = "AddToCartCount";
    private static final String UNIQUE_ADD_TO_CART_COUNT_KEY = "UniqueAddToCartCount";
    private static final String DELETE_FROM_CART_COUNT_KEY = "DeleteFromCartCount";
    private static final String UNIQUE_DELETE_FROM_CART_COUNT_KEY = "UniqueDeleteFromCartCount";
    private static final String ADD_TO_RESEARCH_COUNT_KEY = "AddToResearchCount";
    private static final String UNIQUE_ADD_TO_RESEARCH_COUNT_KEY = "UniqueAddToResearchCount";
    private static final String DELETE_FROM_RESEARCH_COUNT_KEY = "DeleteFromResearchCount";
    private static final String UNIQUE_DELETE_FROM_RESEARCH_COUNT_KEY = "UniqueDeleteFromResearchCount";
    private static final String SHIPPING_ACCESS_COUNT_KEY = "ShippingAccessCount";
    private static final String UNIQUE_SHIPPING_ACCESS_COUNT_KEY = "UniqueShippingAccessCount";
    private static final String PAYMENT_SUCCESS_COUNT_KEY = "PaymentSuccessCount";
    private static final String UNIQUE_PAYMENT_SUCCESS_COUNT_KEY = "UniquePaymentSuccessCount";
    private static final String ORDER_CREATION_COUNT_KEY = "OrderCreationCount";
    private static final String UNIQUE_ORDER_CREATION_COUNT_KEY = "UniqueOrderCreationCount";
    private static final String PAYMENT_FAILURE_COUNT_KEY = "PaymentFailureCount";
    private static final String UNIQUE_PAYMENT_FAILURE_COUNT_KEY = "UniquePaymentFailureCount";
    private static final String PROCEED_TO_PAY_COUNT_KEY = "ProceedToPayCount";
    private static final String UNIQUE_PROCEED_TO_PAY_COUNT_KEY = "UniqueProceedToPayCount";
    
    private static final String FIRST_SOURCE_KEY = "FirstSource";
    private static final String SESSION_SOURCE_KEY = "SessionSource";
    private static final String FIRST_SOURCE_PAID_KEY = "FirstSourcePaid";
    private static final String SESSION_SOURCE_PAID_KEY = "SessionSourcePaid";
    
    
    private Map<Long, Item> itemsMap;
    private Map<Long, Item> itemsCatalogIdMap;
    private Map<String, Item> itemsNameMap;
    private Map<String, Map<String, Long>> itemsResultMap;
    
    private Map<String, Map<String, String>> sessionIdSourceMap;
    
    private Date fromDate;
    private Date toDate;
    
    private Date queryFromDate;
    private Date queryToDate;
    
    private DatastoreService datastore;
    
    public void doPost(HttpServletRequest req, HttpServletResponse resp) {
        logger.warning("Running DailyProductWithSourceAggregator");
        sessionIdSourceMap = new HashMap<String, Map<String,String>>();
        itemsMap = new HashMap<Long, Item>();
        itemsCatalogIdMap = new HashMap<Long, Item>();
        itemsNameMap = new HashMap<String, Item>();
        itemsResultMap = new HashMap<String, Map<String, Long>>();
        
        datastore = DatastoreServiceFactory.getDatastoreService();
        SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");
        Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("IST"));
        String dateStr = req.getParameter("date");  
        if (dateStr != null && !dateStr.isEmpty()) {
            try {
                cal.setTime(sdf.parse(dateStr));
                cal.add(Calendar.DAY_OF_MONTH, 1);
            } catch (ParseException e) {
                e.printStackTrace();
            }
        }
        cal.set(Calendar.HOUR_OF_DAY, 0);  
        cal.set(Calendar.MINUTE, 0);  
        cal.set(Calendar.SECOND, 0);  
        cal.set(Calendar.MILLISECOND, 0);
        toDate = cal.getTime();
        cal.add(Calendar.DAY_OF_MONTH, -1);
        fromDate = cal.getTime();
        
        ItemRepository itemRepository = new ItemRepository();
        List<Item> items = itemRepository.getAll();
        for (Item item : items) {
            itemsMap.put(item.getId(), item);
            itemsCatalogIdMap.put(item.getCatalogId(), item);
            itemsNameMap.put(item.getName().trim(), item);
        }
        
        for (int i = 0; i < 6; i++) {
            cal.setTime(fromDate);
            cal.add(Calendar.HOUR_OF_DAY, 4*i);
            queryFromDate = cal.getTime();
            
            cal.add(Calendar.HOUR_OF_DAY, 4);
            queryToDate = cal.getTime();
            
            logger.warning("From Query Date : " + queryFromDate);
            logger.warning("To Query Date : " + queryToDate);
            
            logger.warning("DailyProductWithSourceAggregator : Updating View Count Map");
            UpdateViewCount(PRODUCT_VIEW_EVENT, VIEW_COUNT_KEY,
                    UNIQUE_VIEW_COUNT_KEY);
            logger.warning("DailyProductWithSourceAggregator : itemsResultMap size : "
                    + itemsResultMap.size());
            logger.warning("DailyProductWithSourceAggregator : sessionIdSourceMap size : "
                    + sessionIdSourceMap.size());

            logger.warning("DailyProductWithSourceAggregator : Updating Add To Cart Count Map");
            UpdateItemIdEventCount(ADD_TO_CART_EVENT, ADD_TO_CART_COUNT_KEY,
                    UNIQUE_ADD_TO_CART_COUNT_KEY, false);
            logger.warning("DailyProductWithSourceAggregator : Updating Delete From Cart Count Map");
            UpdateItemIdEventCount(DELETE_FROM_CART_EVENT,
                    DELETE_FROM_CART_COUNT_KEY,
                    UNIQUE_DELETE_FROM_CART_COUNT_KEY, false);
            logger.warning("DailyProductWithSourceAggregator : Updating Add To Research Count Map");
            UpdateItemIdEventCount(RESEARCH_ADD_EVENT,
                    ADD_TO_RESEARCH_COUNT_KEY,
                    UNIQUE_ADD_TO_RESEARCH_COUNT_KEY, true);
            logger.warning("DailyProductWithSourceAggregator : Updating Delete From Research Count Map");
            UpdateItemIdEventCount(RESEARCH_DELETE_EVENT,
                    DELETE_FROM_RESEARCH_COUNT_KEY,
                    UNIQUE_DELETE_FROM_RESEARCH_COUNT_KEY, true);

            logger.warning("DailyProductWithSourceAggregator : Updating ShippingAccessCount Count Map");
            UpdateItemIdEventCount(SHIPPING_ACCESS_EVENT,
                    SHIPPING_ACCESS_COUNT_KEY,
                    UNIQUE_SHIPPING_ACCESS_COUNT_KEY, false);
            logger.warning("DailyProductWithSourceAggregator : Updating Payment Success Count Map");
            UpdateItemIdEventCount(PAYMENT_SUCCESS_EVENT,
                    PAYMENT_SUCCESS_COUNT_KEY,
                    UNIQUE_PAYMENT_SUCCESS_COUNT_KEY, false);
            logger.warning("DailyProductWithSourceAggregator : Updating Order Creation Count Map");
            UpdateItemIdEventCount(ORDER_CREATION_EVENT,
                    ORDER_CREATION_COUNT_KEY, UNIQUE_ORDER_CREATION_COUNT_KEY,
                    false);
            logger.warning("DailyProductWithSourceAggregator : Updating Payment Failure Count Map");
            UpdateItemIdEventCount(PAYMENT_FAILURE_EVENT,
                    PAYMENT_FAILURE_COUNT_KEY,
                    UNIQUE_PAYMENT_FAILURE_COUNT_KEY, false);
            logger.warning("DailyProductWithSourceAggregator : Updating Proceed To Pay Count Map");
            UpdateItemIdEventCount(PROCEED_TO_PAY_EVENT,
                    PROCEED_TO_PAY_COUNT_KEY, UNIQUE_PROCEED_TO_PAY_COUNT_KEY,
                    false);

            List<ItemActivityWithSource> itemActivities = new ArrayList<ItemActivityWithSource>();
            ItemActivityWithSourceRepository itemActivityRepository = new ItemActivityWithSourceRepository();
            for (Entry<String, Map<String, Long>> entry : itemsResultMap
                    .entrySet()) {
                String key = (String) entry.getKey();
                Map<String, Long> itemMap = (Map<String, Long>) entry
                        .getValue();

                String[] keyItems = key.split("__");

                Boolean isPaidFirstSource = keyItems[0].equals("0") ? false
                        : true;
                String firstSource = keyItems[1];
                Boolean isPaidSessionSource = keyItems[2].equals("0") ? false
                        : true;
                String sessionSource = keyItems[3];
                Long catalogItemId;
                try {
                    catalogItemId = Long.parseLong(keyItems[4]);
                }
                catch (NumberFormatException e) {
                    logger.warning("NumberFormatException : " + keyItems[4]);
                    continue;
                }

                ItemActivityWithSource itemActivity = new ItemActivityWithSource();
                itemActivity.setIsPaidFirstSource(isPaidFirstSource);
                itemActivity.setFirstSource(firstSource);
                itemActivity.setIsPaidSessionSource(isPaidSessionSource);
                itemActivity.setSessionSource(sessionSource);
                itemActivity.setCatalogId(catalogItemId);
                itemActivity.setDate(fromDate);

                itemActivity.setView(itemMap.get(VIEW_COUNT_KEY));
                itemActivity.setAddToCart(itemMap.get(ADD_TO_CART_COUNT_KEY));
                itemActivity.setAddToResearch(itemMap
                        .get(ADD_TO_RESEARCH_COUNT_KEY));
                itemActivity.setDeleteFromCart(itemMap
                        .get(DELETE_FROM_CART_COUNT_KEY));
                itemActivity.setDeleteFromResearch(itemMap
                        .get(DELETE_FROM_RESEARCH_COUNT_KEY));
                itemActivity.setShippingAccess(itemMap
                        .get(SHIPPING_ACCESS_COUNT_KEY));
                itemActivity.setPaymentSuccess(itemMap
                        .get(PAYMENT_SUCCESS_COUNT_KEY));
                itemActivity.setOrderCreation(itemMap
                        .get(ORDER_CREATION_COUNT_KEY));
                itemActivity.setPaymentFailure(itemMap
                        .get(PAYMENT_FAILURE_COUNT_KEY));
                itemActivity.setProceedToPay(itemMap
                        .get(PROCEED_TO_PAY_COUNT_KEY));

                itemActivity.setUniqueView(itemMap.get(UNIQUE_VIEW_COUNT_KEY));
                itemActivity.setUniqueAddToCart(itemMap
                        .get(UNIQUE_ADD_TO_CART_COUNT_KEY));
                itemActivity.setUniqueAddToResearch(itemMap
                        .get(UNIQUE_ADD_TO_RESEARCH_COUNT_KEY));
                itemActivity.setUniqueDeleteFromCart(itemMap
                        .get(UNIQUE_DELETE_FROM_CART_COUNT_KEY));
                itemActivity.setUniqueDeleteFromResearch(itemMap
                        .get(UNIQUE_ADD_TO_RESEARCH_COUNT_KEY));
                itemActivity.setUniqueShippingAccess(itemMap
                        .get(UNIQUE_SHIPPING_ACCESS_COUNT_KEY));
                itemActivity.setUniquePaymentSuccess(itemMap
                        .get(UNIQUE_PAYMENT_SUCCESS_COUNT_KEY));
                itemActivity.setUniqueOrderCreation(itemMap
                        .get(UNIQUE_ORDER_CREATION_COUNT_KEY));
                itemActivity.setUniquePaymentFailure(itemMap
                        .get(UNIQUE_PAYMENT_FAILURE_COUNT_KEY));
                itemActivity.setUniqueProceedToPay(itemMap
                        .get(UNIQUE_PROCEED_TO_PAY_COUNT_KEY));

                itemActivities.add(itemActivity);

            }
            logger.warning("DailyProductWithSourceAggregator : persisting itemactivities : "
                    + itemActivities.size());
            itemActivityRepository.createAll(itemActivities);
        }
    }

    public void doGet(HttpServletRequest req, HttpServletResponse resp) {
        doPost(req, resp);
    }

    private void UpdateViewCount(String event, String viewCountKey, String uniqueViewCountKey) {
        Query query = new Query(KIND);
        query.addFilter(EVENTTYPE_FIELD, Query.FilterOperator.EQUAL, event);
        query.addFilter(DATE_FIELD, Query.FilterOperator.GREATER_THAN_OR_EQUAL, queryFromDate);
        query.addFilter(DATE_FIELD, Query.FilterOperator.LESS_THAN, queryToDate);
        PreparedQuery pq = datastore.prepare(query);
        Set<String> uniqueSet = new HashSet<String>();
        List<Entity> entityResults = new ArrayList<Entity>();
        
        Set<String> sessionIds = new HashSet<String>();
        logger.warning("DailyProductWithSourceAggregator : Getting View Events in list");
        for (Entity e : pq.asIterable(FetchOptions.Builder.withChunkSize(500))) {
            entityResults.add(e);
        }
        for (Entity e : entityResults) {
            String sessionId = (String)e.getProperty(SESSION_ID_FIELD);
            if (sessionId != null && !sessionId.isEmpty()) {
                sessionIds.add(sessionId);
            }
        }
        // populateSessionIdSourceMap(sessionIds);
        sessionIds = null;
        logger.warning("DailyProductWithSourceAggregator : Done Getting View Events in list");
        for (Entity result : entityResults) {
            String sessionId = (String)result.getProperty(SESSION_ID_FIELD);
            String firstSource = getSource(sessionId, FIRST_SOURCE_KEY);
            String sessionSource = getSource(sessionId, SESSION_SOURCE_KEY);
            String paidFirstSource = getSource(sessionId, FIRST_SOURCE_PAID_KEY);
            String paidSessionSource = getSource(sessionId, SESSION_SOURCE_PAID_KEY);
            Long catalogItemId = (Long)result.getProperty(ITEMID_FIELD);
            if(catalogItemId == null) {
                String name = (String)result.getProperty(PRODUCTNAME_FIELD);
                if (itemsNameMap.containsKey(name.trim())) {
                    catalogItemId = itemsNameMap.get(name.trim()).getCatalogId();
                }
            }
            String key = paidFirstSource + "__" + firstSource + "__"
                    + paidSessionSource + "__" + sessionSource + "__"
                    + catalogItemId;
            if (itemsResultMap.containsKey(key)) {
                Map<String, Long> itemMap = itemsResultMap.get(key);
                if (itemMap.containsKey(VIEW_COUNT_KEY)) {
                    Long count = itemMap.get(VIEW_COUNT_KEY);
                    itemMap.put(VIEW_COUNT_KEY, ++count);
                }
                else {
                    itemMap.put(VIEW_COUNT_KEY, 1l);
                }
            }
            else {
                Map<String, Long> itemMap = new HashMap<String, Long>();
                itemMap.put(VIEW_COUNT_KEY, 1l);
                itemsResultMap.put(key, itemMap);
            }
            String uniqueKey = sessionId + "_" + catalogItemId;
            if (!uniqueSet.contains(uniqueKey)) {
                Map<String, Long> itemMap = itemsResultMap.get(key);
                if (itemMap.containsKey(UNIQUE_VIEW_COUNT_KEY)) {
                    Long count = itemMap.get(UNIQUE_VIEW_COUNT_KEY);
                    itemMap.put(UNIQUE_VIEW_COUNT_KEY, ++count);
                }
                else {
                    itemMap.put(UNIQUE_VIEW_COUNT_KEY, 1l);
                }
                uniqueSet.add(uniqueKey);
            }
        }
    }
    
    private void populateSessionIdSourceMap(Set<String> sessionIds) {
        
        Query sessionQuery = new Query(KIND);
        sessionQuery.addFilter(EVENTTYPE_FIELD, Query.FilterOperator.EQUAL,
                NEW_SESSION_EVENT);
        sessionQuery.addFilter(DATE_FIELD, Query.FilterOperator.GREATER_THAN_OR_EQUAL, queryFromDate);
        sessionQuery.addFilter(DATE_FIELD, Query.FilterOperator.LESS_THAN, queryToDate);
        
        PreparedQuery sessionPq = datastore.prepare(sessionQuery);
        
        List<Entity> entityResults = new ArrayList<Entity>();
        for (Entity e : sessionPq.asIterable(FetchOptions.Builder.withChunkSize(500))) {
            entityResults.add(e);
        }
        
        for (Entity sessionResult : entityResults) {
            String sessionId = (String)sessionResult.getProperty(SESSION_ID_FIELD);
            if (sessionIds.contains(sessionId)) {
                String firstSource = "";
                String sessionSource = "";
                String isPaidFirstSource = "0";
                String isPaidSessionSource = "0";
                if (sessionResult != null) {
                    if (sessionResult.hasProperty(FIRST_SOURCE_FIELD)) {
                        firstSource = sessionResult.getProperty(FIRST_SOURCE_FIELD).toString();
                        isPaidFirstSource = firstSource.startsWith("PAID :") ? "1" : "0";
                        firstSource = firstSource.toLowerCase().replaceFirst(".*?http[s]?://", "").replaceAll("/.*", "").replaceAll("www.", "");
                        if (firstSource.length() > 20) {
                            firstSource = firstSource.substring(0, 20);
                        }
                    }
                    if (sessionResult.hasProperty(SESSION_SOURCE_FIELD)) {
                        sessionSource = sessionResult.getProperty(
                                SESSION_SOURCE_FIELD).toString();
                        isPaidSessionSource = sessionSource.startsWith("PAID :") ? "1" : "0";
                        sessionSource = sessionSource.toLowerCase().replaceFirst(".*?http[s]?://", "").replaceAll("/.*", "").replaceAll("www.", "");
                        if (sessionSource.length() > 20) {
                            sessionSource = sessionSource.substring(0, 20);
                        }
                    }
                }
                Map<String, String> sessionSourcesMap = new HashMap<String, String>();
                sessionSourcesMap.put(FIRST_SOURCE_KEY, firstSource);
                sessionSourcesMap.put(SESSION_SOURCE_KEY, sessionSource);
                sessionSourcesMap.put(FIRST_SOURCE_PAID_KEY, isPaidFirstSource);
                sessionSourcesMap.put(SESSION_SOURCE_PAID_KEY, isPaidSessionSource);
                sessionIdSourceMap.put(sessionId, sessionSourcesMap);
            }
        }
        logger.warning("DailyProductWithSourceAggregator : polpulateSessionIds : "
                + sessionIdSourceMap.size());
    }

    private String getSource(String sessionId, String sourceKey) {
        if (sessionIdSourceMap.containsKey(sessionId)) {
            return sessionIdSourceMap.get(sessionId).get(sourceKey);
        }
        if (sessionId != null && !sessionId.isEmpty()) {
            Query sessionQuery = new Query(KIND);
            sessionQuery.addFilter(EVENTTYPE_FIELD, Query.FilterOperator.EQUAL,
                    NEW_SESSION_EVENT);
            sessionQuery.addFilter(SESSION_ID_FIELD,
                    Query.FilterOperator.EQUAL, sessionId);

            PreparedQuery sessionPq = datastore.prepare(sessionQuery);
            for (Entity sessionResult : sessionPq.asIterable()) {
                String firstSource = "";
                String sessionSource = "";
                String isPaidFirstSource = "0";
                String isPaidSessionSource = "0";
                if (sessionResult != null) {
                    if (sessionResult.hasProperty(FIRST_SOURCE_FIELD)) {
                        firstSource = sessionResult.getProperty(
                                FIRST_SOURCE_FIELD).toString();
                        isPaidFirstSource = firstSource.startsWith("PAID :") ? "1" : "0";
                        firstSource = firstSource.toLowerCase().replaceFirst(".*?http[s]?://", "").replaceAll("/.*", "").replaceAll("www.", "");
                        if (firstSource.length() > 20) {
                            firstSource = firstSource.substring(0, 20);
                        }
                    }
                    if (sessionResult.hasProperty(SESSION_SOURCE_FIELD)) {
                        sessionSource = sessionResult.getProperty(SESSION_SOURCE_FIELD).toString();
                        isPaidSessionSource = sessionSource.startsWith("PAID :") ? "1" : "0";
                        sessionSource = sessionSource.toLowerCase().replaceFirst(".*?http[s]?://", "").replaceAll("/.*", "").replaceAll("www.", "");
                        if (sessionSource.length() > 20) {
                            sessionSource = sessionSource.substring(0, 20);
                        }
                    }
                }
                Map<String, String> sessionSourcesMap = new HashMap<String, String>();
                sessionSourcesMap.put(FIRST_SOURCE_KEY, firstSource);
                sessionSourcesMap.put(SESSION_SOURCE_KEY, sessionSource);
                sessionSourcesMap.put(FIRST_SOURCE_PAID_KEY, isPaidFirstSource);
                sessionSourcesMap.put(SESSION_SOURCE_PAID_KEY, isPaidSessionSource);
                sessionIdSourceMap.put(sessionId, sessionSourcesMap);
            }
        }
        if (sessionIdSourceMap.containsKey(sessionId)) {
            return sessionIdSourceMap.get(sessionId).get(sourceKey);
        }
        logger.warning("DailyProductWithSourceAggregator : sessionIdSourceMap not found : "
                + sessionId);
        return "";
    }

    private void UpdateItemIdEventCount(String event, String countKey, String uniqueCountKey, boolean isCatalogIdEvent) {
        Query query = new Query(KIND);
        query.addFilter(EVENTTYPE_FIELD, Query.FilterOperator.EQUAL, event);
        query.addFilter(DATE_FIELD, Query.FilterOperator.GREATER_THAN_OR_EQUAL, queryFromDate);
        query.addFilter(DATE_FIELD, Query.FilterOperator.LESS_THAN, queryToDate);
        PreparedQuery pq = datastore.prepare(query);
        Set<String> uniqueSet = new HashSet<String>();
        List<Entity> results = new ArrayList<Entity>();
        for (Entity e : pq.asList(FetchOptions.Builder.withChunkSize(500))) {
            results.add(e);
        }
        for (Entity result : results) {
            String sessionId = (String)result.getProperty(SESSION_ID_FIELD);
            String firstSource = getSource(sessionId, FIRST_SOURCE_KEY);
            String sessionSource = getSource(sessionId, SESSION_SOURCE_KEY);
            String paidFirstSource = getSource(sessionId, FIRST_SOURCE_PAID_KEY);
            String paidSessionSource = getSource(sessionId, SESSION_SOURCE_PAID_KEY);
            List<Long> itemIds = null;
            if (!result.hasProperty(ITEMIDS_FIELD)) {
                continue;
            }
            try {
                itemIds = (List<Long>)result.getProperty(ITEMIDS_FIELD);
            } catch (ClassCastException e) {
                try {
                    itemIds = new ArrayList<Long>();
                    itemIds.add((Long)result.getProperty(ITEMIDS_FIELD));
                }
                catch(Exception ex) {
                    log(e.getMessage());
                }
            }
            
            if (itemIds == null) {
                continue;
            }
            for (Long itemId : itemIds) {
                if (itemId == null) {
                    continue;
                }
                if (!isCatalogIdEvent) {
                    if (itemsMap.containsKey(itemId)) {
                        itemId = itemsMap.get(itemId).getCatalogId();
                    }
                }
                String key = paidFirstSource + "__" + firstSource + "__"
                    + paidSessionSource + "__" + sessionSource + "__"
                    + itemId;
                if (itemsResultMap.containsKey(key)) {
                    Map<String, Long> itemMap = itemsResultMap.get(key);
                    if (itemMap.containsKey(countKey)) {
                        Long count = itemMap.get(countKey);
                        itemMap.put(countKey, ++count);
                    }
                    else {
                        itemMap.put(countKey, 1l);
                    }
                }
                else {
                    Map<String, Long> itemMap = new HashMap<String, Long>();
                    itemMap.put(countKey, 1l);
                    itemsResultMap.put(key, itemMap);
                }
                String uniqueKey = sessionId + itemId;
                if (!uniqueSet.contains(uniqueKey)) {
                    Map<String, Long> itemMap = itemsResultMap.get(key);
                    if (itemMap.containsKey(uniqueCountKey)) {
                        Long count = itemMap.get(uniqueCountKey);
                        itemMap.put(uniqueCountKey, ++count);
                    }
                    else {
                        itemMap.put(uniqueCountKey, 1l);
                    }
                    uniqueSet.add(uniqueKey);
                }
            }
        }
    }
}