Rev 17330 | Blame | Compare with Previous | Last modification | View Log | RSS feed
import MySQLdbfrom dtr.storage.DataService import brand_preferencesfrom elixir import *from dtr.storage import DataService, Mongofrom dtr.storage.DataService import user_filtersfrom sqlalchemy.sql.expression import func, betweenfrom datetime import datetime,timedeltafrom UserSegmentationLinksMaster import get_mongo_connection_newimport refrom dtr.utils.utils import to_java_dateDataService.initialize(echo=True)DB_HOST = "localhost"DB_USER = "root"DB_PASSWORD = "shop2020"DB_NAME = "dtr"USER_SEGMENTATION_LINK_QUERY="""select x.user_id,x.brand,x.category_id,x.count,x.source,x.weightfrom(select c.user_id,c.brand,c.category_id,count(c.id) as count,"clicks" as source,(uw.weightage * count(c.id)) as weightfrom clicks c join user_segmentation_weigthage uw on "clicks"=uw.weightage_typewhere c.category_id is not null and c.category_id !=0 and c.brand is not null andc.brand not like '' and c.user_id!=0 and datediff(curdate(),c.created)<7 group by c.category_id,c.brand,c.user_idUNIONselect m.userId as user_id,m.brand,m.categoryId as category_id,count(1) as count,"orders" as source,(uw.weightage * count(1)) as weightfrom merchantsuborders m join user_segmentation_weigthage uw on "orders"=uw.weightage_typewhere m.categoryId is not null and m.brand is not null and m.brand not like 'NA' and m.userId!=0 and datediff(curdate(),m.createdTime)<7 group by m.categoryId,m.brand,m.userIdUNIONselect fo.user_id,fo.brand,fo.category as category_id,count(fo.id) as count,"orders" as source, (uw.weightage * count(fo.id)) as weightfrom flipkartorders fo join user_segmentation_weigthage uw on "orders"=uw.weightage_typewhere fo.category is not null and fo.brand is not null and fo.brand not like '' and fo.user_id!=0 and fo.user_id is not nulland datediff(curdate(),fo.created)<7 group by fo.category,fo.brand,fo.user_idUNIONselect bp.user_id,bp.brand,bp.category_id,count(1) as count,"preferences" as source,(uw.weightage* count(1)) as weightfrom brand_preferences bp join user_segmentation_weigthage uw on "preferences"=uw.weightage_typewhere user_id!=0 and status='show' and datediff(curdate(),bp.created)<7 group by bp.category_id,bp.brand,bp.user_id) as x"""USER_STORE_PRODUCT_VIEW="""SELECT u.user_id,count(u.id) AS count, SUBSTRING_INDEX(SUBSTRING_INDEX(SUBSTRING_INDEX(SUBSTRING_INDEX(u.url, '/', -1 ),'?',1),'%',1),'#',1)AS skuBundleId,"product_view" as source,(uw.weightage* count(u.id)) as weightFROM user_urls u join user_segmentation_weigthage uw on "product_view"=uw.weightage_typeWHERE u.url LIKE '%store_products/view%' and u.user_id!=0 andSUBSTRING_INDEX(SUBSTRING_INDEX(SUBSTRING_INDEX(SUBSTRING_INDEX(u.url, '/', -1 ),'?',1),'%',1),'#',1) not like ''and datediff(curdate(),u.created)<7group by u.user_id,skuBundleId;"""REPLACE_QUERY="""REPLACE INTO newuserlinkssegmentation( user_id, brand, source,category_id, count, weight,created,modified)SELECT user_id, brand, source,category_id, count, weight,created,now()FROM newuserlinkssegmentation7daysWHERE user_id =%s and brand=%s and category_id=%s and source=%s"""userMap = {} #key - user_id val - brand mapuserMobileCategory={}userTabletCategory={}def getDbConnection():return MySQLdb.connect(DB_HOST, DB_USER, DB_PASSWORD, DB_NAME)def deleteUserForLinks():print '********** Deleting records from last 7 days tables *********'conn = getDbConnection()cursor = conn.cursor()deleteAllUserGroups = "delete from newuserlinkssegmentation7days"try:cursor.execute(deleteAllUserGroups)conn.commit()except:conn.rollback()finally:conn.close()print '********** Completed Deleting records from last 7 days tables *********'def addUsersForLinks():print '********** Inserting records in last 7 days table *********'user_id=''brand=''category_id=''source=''count=''weight=''datesql= USER_SEGMENTATION_LINK_QUERYconn = getDbConnection()cursor = conn.cursor()cursor.execute(datesql)result = cursor.fetchall()for r in result:user_id=r[0]brand=r[1]if r[2] == 'Mobiles':category_id=3elif r[2] == 'Tablets':category_id=5else:category_id=r[2]count=r[3]source=r[4]weight=r[5]if brand is not 'NA':sql = "replace into newuserlinkssegmentation7days (user_id, brand, category_id, source, count,weight,created) values(%s, %s, %s, %s, %s,%s,%s)"cursor.execute(sql,(user_id,brand, category_id, source, count,weight,datetime.now()))conn.commit()conn.close()print '********** Completed inserting records for orders,preferences and clicks in last 7 days table *********'def filters():print '********** Inserting records for filters in last 7 day table *********'conn = getDbConnection()cursor = conn.cursor()cursor.execute("select weightage from user_segmentation_weigthage where weightage_type='filters'")result = cursor.fetchall()filter_weight=result[0][0]todayDate=datetime.today().date()data = session.query((user_filters)).filter_by(type="brand").filter(~(user_filters.filters).like('NA')).filter(user_filters.created.between(todayDate-timedelta(days=6),todayDate)).all()for d in data:if userMap.has_key(d.user_id):brandMap = userMap.get(d.user_id)else:brandMap = {}brands = d.filters.split('|')for brand in brands:if brandMap.has_key(brand):brandMap[brand] = brandMap.get(brand) + 1else:brandMap[brand] = 1userMap[d.user_id] = brandMapfor x, y in userMap.iteritems():for bra,cow in y.iteritems():user_id=xbrand=bracount=cowcategory_id=3weight=cow*filter_weightsource='filters'if brand is not 'NA':sql = "replace into newuserlinkssegmentation7days (user_id, brand, category_id, source, count,weight,created) values(%s, %s, %s, %s, %s,%s,%s)"cursor.execute(sql,(user_id,brand, category_id, source, count,weight,datetime.now()))conn.commit()conn.close()print '********** Completed inserting records for filters in last 7 day table *********'def storeProductView():print '********** Inserting records for store product view in last 7 day table *********'user_id=''brand=''category_id=''source=''count=''weight=''result = get_mongo_connection_new().User.browsinghistories.find({"$and":[{"url":{"$regex" : "/store_products\/view/"}},{"created":{"$gte":getLastSeventhDay()}}]})for r in result:matchObj = re.search('store_products/view/(.+?)/(.+?)$',r.get('url'))if matchObj:skuBundleId = matchObj.group(2)if 'user_id' in skuBundleId or '=' in skuBundleId:newMatchObj = re.search('(.+?)\?',skuBundleId)if newMatchObj:skuBundleId=newMatchObj.group(1)else:print 'No match userId', skuBundleIdcontinueelif '#' in skuBundleId:newMatchObj = re.search('(.+?)\#',skuBundleId)if newMatchObj:skuBundleId=newMatchObj.group(1)else:print 'No match #', skuBundleIdcontinueif '%' in skuBundleId:newMatchObj = re.search('(.+?)\%',skuBundleId)if newMatchObj:skuBundleId=newMatchObj.group(1)else:print 'No match', skuBundleIdelse:print "No match!!"print skuBundleIdif skuBundleId.isdigit():print 'Inside is digit true'skuBundleId = int(skuBundleId)else:print 'Inside is digit false'skuBundleId = int(float(skuBundleId))skuData = Mongo.getSkuBrandData(skuBundleId)if len(skuData) !=0:user_id=r.get('user_id')brand=skuData['brand']category_id =skuData['category_id']source='product_view'count=1if category_id==3:if userMobileCategory.has_key(user_id):brandMap = userMobileCategory.get(user_id)else:brandMap = {}if brandMap.has_key(brand):brandMap[brand] = brandMap.get(brand) + 1else:brandMap[brand] = 1userMobileCategory[user_id] = brandMapif category_id==5:if userTabletCategory.has_key(user_id):brandCategoryMap = userTabletCategory.get(user_id)else:brandCategoryMap = {}if brandCategoryMap.has_key(brand):brandCategoryMap[brand] = brandCategoryMap.get(brand) + 1else:brandCategoryMap[brand] = 1userTabletCategory[user_id] = brandCategoryMapelse:print 'Sku bundle Id not present',r.get('url')conn=getDbConnection()cursor=conn.cursor()cursor.execute("select weightage from user_segmentation_weigthage where weightage_type='product_view'")result = cursor.fetchall()product_view_weight=result[0][0]for x, y in userMobileCategory.iteritems():for bra,cow in y.iteritems():user_id=xbrand= bracount=cowweight=cow*product_view_weightsource='product_view'category_id=3sql = "replace into newuserlinkssegmentation7days (user_id, brand, category_id, source, count,weight,created) values(%s, %s, %s, %s, %s,%s,%s)"cursor.execute(sql,(user_id,brand, category_id, source, count,weight,datetime.now()))conn.commit()for x, y in userTabletCategory.iteritems():for bra,cow in y.iteritems():user_id=xbrand= bracount=cowweight=cow*product_view_weightsource='product_view'category_id=5sql = "replace into newuserlinkssegmentation7days (user_id, brand, category_id, source, count,weight,created) values(%s, %s, %s, %s, %s,%s,%s)"cursor.execute(sql,(user_id,brand, category_id, source, count,weight,datetime.now()))conn.commit()conn.close()print '********** Completed inserting records for store product view in last 7 day table *********'def findRepeatingOrders():print '********** Call to sanitize data for orders if any multiple *********'datesql= "select user_id,brand,category_id from userlinkssegmentation7days where source='orders' group by user_id,brand,category_id having count(source)=2;"conn = getDbConnection()cursor = conn.cursor()rows_count = cursor.execute(datesql)if rows_count>0:result = cursor.fetchall()for r in result:print rfindDetails(r[0],r[1],r[2])else:print 'No Data to be sanitized for repeating orders'def findDetails(userId,brand,category):datesql= "select sum(count),sum(weight) from userlinkssegmentation7days where user_id=%s and source='orders' and brand=%s group by source;"conn = getDbConnection()cursor = conn.cursor()cursor.execute(datesql,(userId,brand))result = cursor.fetchall()for r in result:deletesql="delete from userlinkssegmentation7days where user_id=%s and brand=%s and source='orders'"cursor = conn.cursor()cursor.execute(deletesql,(userId,brand))conn.commit()insertsql="insert into userlinkssegmentation7days (user_id, brand, category_id, source, count,weight) values(%s, %s, %s, %s, %s,%s)"cursor = conn.cursor()cursor.execute(insertsql,(userId,brand,category,'orders',r[0],r[1]))conn.commit()def findRepeatingClicks():print '********** Call to sanitize data for product view if any multiple *********'datesql= "select user_id,brand,category_id from userlinkssegmentation7days where source='product_view' group by user_id,brand,category_id having count(source)=2;"conn = getDbConnection()cursor = conn.cursor()rows_count = cursor.execute(datesql)if rows_count>0:result = cursor.fetchall()for r in result:print rfindViewDetails(r[0],r[1],r[2])else:print 'No Data to be sanitized for product view'def findViewDetails(userId,brand,category):datesql= "select sum(count),sum(weight) from userlinkssegmentation7days where user_id=%s and source='product_view' and brand=%s group by source;"conn = getDbConnection()cursor = conn.cursor()cursor.execute(datesql,(userId,brand))result = cursor.fetchall()for r in result:deletesql="delete from userlinkssegmentation7days where user_id=%s and brand=%s and source='product_view'"cursor = conn.cursor()cursor.execute(deletesql,(userId,brand))conn.commit()insertsql="insert into userlinkssegmentation7days (user_id, brand, category_id, source, count,weight) values(%s, %s, %s, %s, %s,%s)"cursor = conn.cursor()cursor.execute(insertsql,(userId,brand,category,'product_view',r[0],r[1]))conn.commit()def checkCountForUpdate():print '********** Checking users brand whose cumulative weight is greater than 3 *********'datesql= "select user_id,brand,category_id from newuserlinkssegmentation7days group by user_id,brand,category_id having sum(weight) >=3;"conn = getDbConnection()cursor = conn.cursor()cursor.execute(datesql)result = cursor.fetchall()for r in result:checkUserForUpdate(r[0],r[1],r[2])conn.close()def checkUserForUpdate(userId,brand,category):print '********** Checking users brand present or not in the master table *********'sources = ['product_view','clicks','filters','orders','preferences']datesql= "select distinct brand from newuserlinkssegmentation where user_id=%s and brand=%s and category_id=%s"conn = getDbConnection()cursor = conn.cursor()cursor.execute(datesql,(userId,brand,category))result = cursor.fetchall()print 'Result in Check user for update',resultif len(result)==0:insertNewUserLinkDetails(userId,brand,category)else:datesql= "select distinct source from newuserlinkssegmentation where user_id=%s and brand=%s and category_id=%s"cursor.execute(datesql,(userId,result[0][0],category))newResult = cursor.fetchall()print 'Data for source',newResultfor source in newResult:print 'Source',source[0]userReplaceData(userId,brand,category, source[0])sources.remove(source[0])for source in sources:insertNewUserLinkDetailsSourceWise(userId,brand,category, source)conn.close()def userReplaceData(userId,brand,category,source):print 'Data that needs to be updated for', userId,brand,categoryconn = getDbConnection()cursor = conn.cursor()print REPLACE_QUERY%(userId,brand,category,source)cursor.execute(REPLACE_QUERY,(userId,brand,category,source))conn.commit()conn.close()def insertNewUserLinkDetails(userId,brand,category):print 'Data needs to be inserted as the brand was not present earlier', userId,brand,categoryconn = getDbConnection()selectNewUser="select * from newuserlinkssegmentation7days where user_id=%s and brand=%s and category_id=%s"cursor = conn.cursor()cursor.execute(selectNewUser,(userId,brand,category))result = cursor.fetchall()print 'Result while inserting new record in master',resultfor r in result:sql = "insert into newuserlinkssegmentation (user_id, brand, category_id, source, count,weight,created,modified) values(%s, %s, %s, %s, %s,%s,%s,%s)"cursor.execute(sql,(r[0],r[1],r[2], r[3], r[4],r[5],datetime.now(),datetime.now()))conn.commit()conn.close()def insertNewUserLinkDetailsSourceWise(userId,brand,category,source):print 'Data needs to be inserted as the source was not present earlier', userId,brand,category,sourceconn = getDbConnection()selectNewUser="select * from newuserlinkssegmentation7days where user_id=%s and brand=%s and category_id=%s and source=%s"cursor = conn.cursor()cursor.execute(selectNewUser,(userId,brand,category,source))result = cursor.fetchall()print 'Result while inserting new record in master',resultfor r in result:sql = "insert into newuserlinkssegmentation (user_id, brand, category_id, source, count,weight,created,modified) values(%s, %s, %s, %s, %s,%s,%s,%s)"cursor.execute(sql,(r[0],r[1],r[2], r[3], r[4],r[5],datetime.now(),datetime.now()))conn.commit()conn.close()def getLastSeventhDay():return to_java_date(datetime.today() - timedelta(days=7))def main():print datetime.now()print '********** Starting to update links for Users *********'deleteUserForLinks()addUsersForLinks()filters()storeProductView()#findRepeatingOrders()#findRepeatingClicks()checkCountForUpdate()print '********** Finished updating records for user links *********'if __name__=='__main__':main()