Rev 16625 | Blame | Compare with Previous | Last modification | View Log | RSS feed
import MySQLdbfrom dtr.storage.DataService import brand_preferencesfrom elixir import *from dtr.storage import DataService, Mongofrom dtr.storage.DataService import user_filtersfrom sqlalchemy.sql.expression import func, betweenfrom datetime import datetime,timedeltaDataService.initialize(echo=True)DB_HOST = "localhost"DB_USER = "root"DB_PASSWORD = "shop2020"DB_NAME = "dtr"USER_SEGMENTATION_LINK_QUERY="""select x.user_id,x.brand,x.category_id,x.count,x.source,x.weightfrom(select c.user_id,c.brand,c.category_id,count(c.id) as count,"clicks" as source,(uw.weightage * count(c.id)) as weightfrom clicks c join user_segmentation_weigthage uw on "clicks"=uw.weightage_typewhere c.category_id is not null and c.category_id !=0 and c.brand is not null andc.brand not like '' and c.user_id!=0 and datediff(curdate(),c.created)<7 group by c.category_id,c.brand,c.user_idUNIONselect m.userId as user_id,m.brand,m.categoryId as category_id,count(1) as count,"orders" as source,(uw.weightage * count(1)) as weightfrom merchantsuborders m join user_segmentation_weigthage uw on "orders"=uw.weightage_typewhere m.categoryId is not null and m.brand is not null and m.userId!=0 and datediff(curdate(),m.createdTime)<7 group by m.categoryId,m.brand,m.userIdUNIONselect fo.user_id,fo.brand,fo.category as category_id,count(fo.id) as count,"orders" as source, (uw.weightage * count(fo.id)) as weightfrom flipkartorders fo join user_segmentation_weigthage uw on "orders"=uw.weightage_typewhere fo.category is not null and fo.brand is not null and fo.brand not like '' and fo.user_id!=0 and fo.user_id is not nulland datediff(curdate(),fo.created)<7 group by fo.category,fo.brand,fo.user_idUNIONselect bp.user_id,bp.brand,bp.category_id,count(1) as count,"preferences" as source,(uw.weightage* count(1)) as weightfrom brand_preferences bp join user_segmentation_weigthage uw on "preferences"=uw.weightage_typewhere user_id!=0 and status='show' and datediff(curdate(),bp.created)<7 group by bp.category_id,bp.brand,bp.user_id) as x"""USER_STORE_PRODUCT_VIEW="""SELECT u.user_id,count(u.id) AS count, SUBSTRING_INDEX(SUBSTRING_INDEX(SUBSTRING_INDEX(SUBSTRING_INDEX(u.url, '/', -1 ),'?',1),'%',1),'#',1)AS skuBundleId,"product_view" as source,(uw.weightage* count(u.id)) as weightFROM user_urls u join user_segmentation_weigthage uw on "product_view"=uw.weightage_typeWHERE u.url LIKE '%store_products/view%' and u.user_id!=0 andSUBSTRING_INDEX(SUBSTRING_INDEX(SUBSTRING_INDEX(SUBSTRING_INDEX(u.url, '/', -1 ),'?',1),'%',1),'#',1) not like ''and datediff(curdate(),u.created)<7group by u.user_id,skuBundleId;"""REPLACE_QUERY="""REPLACE INTO userlinkssegmentation( user_id, brand, source,category_id, count, weight )SELECT user_id, brand, source,category_id, count, weightFROM userlinkssegmentation7daysWHERE user_id =%s and brand=%s and category_id=%s"""userMap = {} #key - user_id val - brand mapuserMobileCategory={}userTabletCategory={}def getDbConnection():return MySQLdb.connect(DB_HOST, DB_USER, DB_PASSWORD, DB_NAME)def deleteUserForLinks():print '********** Deleting records from last 7 days tables *********'conn = getDbConnection()cursor = conn.cursor()deleteAllUserGroups = "delete from userlinkssegmentation7days"try:cursor.execute(deleteAllUserGroups)conn.commit()except:conn.rollback()finally:conn.close()print '********** Completed Deleting records from last 7 days tables *********'def addUsersForLinks():print '********** Inserting records in last 7 days table *********'user_id=''brand=''category_id=''source=''count=''weight=''datesql= USER_SEGMENTATION_LINK_QUERYconn = getDbConnection()cursor = conn.cursor()cursor.execute(datesql)result = cursor.fetchall()for r in result:user_id=r[0]brand=r[1]if r[2] == 'Mobiles':category_id=3elif r[2] == 'Tablets':category_id=5else:category_id=r[2]count=r[3]source=r[4]weight=r[5]sql = "insert into userlinkssegmentation7days (user_id, brand, category_id, source, count,weight) values(%s, %s, %s, %s, %s,%s)"cursor.execute(sql,(user_id,brand, category_id, source, count,weight))conn.commit()conn.close()print '********** Completed inserting records for orders,preferences and clicks in last 7 days table *********'def filters():print '********** Inserting records for filters in last 7 day table *********'conn = getDbConnection()cursor = conn.cursor()cursor.execute("select weightage from user_segmentation_weigthage where weightage_type='filters'")result = cursor.fetchall()filter_weight=result[0][0]todayDate=datetime.today().date()data = session.query((user_filters)).filter_by(type="brand").filter(user_filters.created.between(todayDate-timedelta(days=6),todayDate)).all()for d in data:if userMap.has_key(d.user_id):brandMap = userMap.get(d.user_id)else:brandMap = {}brands = d.filters.split('|')for brand in brands:if brandMap.has_key(brand):brandMap[brand] = brandMap.get(brand) + 1else:brandMap[brand] = 1userMap[d.user_id] = brandMapfor x, y in userMap.iteritems():for bra,cow in y.iteritems():user_id=xbrand=bracount=cowcategory_id=3weight=cow*filter_weightsource='filters'sql = "insert into userlinkssegmentation7days (user_id, brand, category_id, source, count,weight) values(%s, %s, %s, %s, %s,%s)"cursor.execute(sql,(user_id,brand, category_id, source, count,weight))conn.commit()conn.close()print '********** Completed inserting records for filters in last 7 day table *********'def storeProductView():print '********** Inserting records for store product view in last 7 day table *********'user_id=''brand=''category_id=''source=''count=''weight=''datesql= USER_STORE_PRODUCT_VIEWconn = getDbConnection()cursor = conn.cursor()cursor.execute(datesql)result = cursor.fetchall()for r in result:skuData = Mongo.getSkuBrandData(int(r[2]))if len(skuData) !=0:user_id=r[0]brand=skuData['brand']category_id =skuData['category_id']source=r[3]count=r[1]if category_id==3:if userMobileCategory.has_key(user_id):brandMap = userMobileCategory.get(user_id)else:brandMap = {}if brandMap.has_key(brand):brandMap[brand] = brandMap.get(brand) + r[1]else:brandMap[brand] = r[1]userMobileCategory[user_id] = brandMapif category_id==5:if userTabletCategory.has_key(user_id):brandCategoryMap = userTabletCategory.get(user_id)else:brandCategoryMap = {}if brandCategoryMap.has_key(brand):brandCategoryMap[brand] = brandCategoryMap.get(brand) + r[1]else:brandCategoryMap[brand] = r[1]userTabletCategory[user_id] = brandCategoryMapelse:print 'Sku bundle Id not present',r[2]cursor.execute("select weightage from user_segmentation_weigthage where weightage_type='product_view'")result = cursor.fetchall()product_view_weight=result[0][0]for x, y in userMobileCategory.iteritems():for bra,cow in y.iteritems():user_id=xbrand= bracount=cowweight=cow*product_view_weightsource='product_view'category_id=3sql = "insert into userlinkssegmentation7days (user_id, brand, category_id, source, count,weight) values(%s, %s, %s, %s, %s,%s)"cursor.execute(sql,(user_id,brand, category_id, source, count,weight))conn.commit()for x, y in userTabletCategory.iteritems():for bra,cow in y.iteritems():user_id=xbrand= bracount=cowweight=cow*product_view_weightsource='product_view'category_id=5sql = "insert into userlinkssegmentation7days (user_id, brand, category_id, source, count,weight) values(%s, %s, %s, %s, %s,%s)"cursor.execute(sql,(user_id,brand, category_id, source, count,weight))conn.commit()conn.close()print '********** Completed inserting records for store product view in last 7 day table *********'def findRepeatingOrders():print '********** Call to sanitize data for orders if any multiple *********'datesql= "select user_id,brand,category_id from userlinkssegmentation7days where source='orders' group by user_id,brand,category_id having count(source)=2;"conn = getDbConnection()cursor = conn.cursor()rows_count = cursor.execute(datesql)if rows_count>0:result = cursor.fetchall()for r in result:print rfindDetails(r[0],r[1],r[2])else:print 'No Data to be sanitized for repeating orders'def findDetails(userId,brand,category):datesql= "select sum(count),sum(weight) from userlinkssegmentation7days where user_id=%s and source='orders' and brand=%s group by source;"conn = getDbConnection()cursor = conn.cursor()cursor.execute(datesql,(userId,brand))result = cursor.fetchall()for r in result:deletesql="delete from userlinkssegmentation7days where user_id=%s and brand=%s and source='orders'"cursor = conn.cursor()cursor.execute(deletesql,(userId,brand))conn.commit()insertsql="insert into userlinkssegmentation7days (user_id, brand, category_id, source, count,weight) values(%s, %s, %s, %s, %s,%s)"cursor = conn.cursor()cursor.execute(insertsql,(userId,brand,category,'orders',r[0],r[1]))conn.commit()def findRepeatingClicks():print '********** Call to sanitize data for product view if any multiple *********'datesql= "select user_id,brand,category_id from userlinkssegmentation7days where source='product_view' group by user_id,brand,category_id having count(source)=2;"conn = getDbConnection()cursor = conn.cursor()rows_count = cursor.execute(datesql)if rows_count>0:result = cursor.fetchall()for r in result:print rfindViewDetails(r[0],r[1],r[2])else:print 'No Data to be sanitized for product view'def findViewDetails(userId,brand,category):datesql= "select sum(count),sum(weight) from userlinkssegmentation7days where user_id=%s and source='product_view' and brand=%s group by source;"conn = getDbConnection()cursor = conn.cursor()cursor.execute(datesql,(userId,brand))result = cursor.fetchall()for r in result:deletesql="delete from userlinkssegmentation7days where user_id=%s and brand=%s and source='product_view'"cursor = conn.cursor()cursor.execute(deletesql,(userId,brand))conn.commit()insertsql="insert into userlinkssegmentation7days (user_id, brand, category_id, source, count,weight) values(%s, %s, %s, %s, %s,%s)"cursor = conn.cursor()cursor.execute(insertsql,(userId,brand,category,'product_view',r[0],r[1]))conn.commit()def checkCountForUpdate():print '********** Checking users brand whose cumulative weight is greater than 3 *********'datesql= "select user_id,brand,category_id from userlinkssegmentation7days group by user_id,brand having sum(weight) >=3;"conn = getDbConnection()cursor = conn.cursor()cursor.execute(datesql)result = cursor.fetchall()for r in result:checkUserForUpdate(r[0],r[1],r[2])conn.close()def checkUserForUpdate(userId,brand,category):print '********** Checking users brand present or not in the master table *********'datesql= "select distinct brand from userlinkssegmentation where user_id=%s and brand=%s and category_id=%s"conn = getDbConnection()cursor = conn.cursor()cursor.execute(datesql,(userId,brand,category))result = cursor.fetchall()if len(result)==0:insertNewUserLinkDetails(userId,brand,category)else:userReplaceData(userId,brand,category)conn.close()def userReplaceData(userId,brand,category):print 'Data that needs to be updated for', userId,brand,categoryconn = getDbConnection()cursor = conn.cursor()cursor.execute(REPLACE_QUERY,(userId,brand,category))conn.commit()conn.close()def insertNewUserLinkDetails(userId,brand,category):print 'Data needs to be inserted as the brand was not present earlier', userId,brand,categoryconn = getDbConnection()selectNewUser="select * from userlinkssegmentation7days where user_id=%s and brand=%s and category_id=%s"cursor = conn.cursor()cursor.execute(selectNewUser,(userId,brand,category))result = cursor.fetchall()print 'Result while inserting new record in master',resultfor r in result:sql = "insert into userlinkssegmentation (user_id, brand, category_id, source, count,weight) values(%s, %s, %s, %s, %s,%s)"cursor.execute(sql,(r[0],r[1],r[2], r[3], r[4],r[5]))conn.commit()conn.close()def main():print datetime.now()print '********** Starting to update links for Users *********'deleteUserForLinks()addUsersForLinks()filters()storeProductView()findRepeatingOrders()findRepeatingClicks()checkCountForUpdate()print '********** Finished updating records for user links *********'if __name__=='__main__':main()