Subversion Repositories SmartDukaan

Rev

Rev 19655 | Rev 20775 | Go to most recent revision | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
19095 manish.sha 1
#!/usr/bin/python
2
 
3
import threading
4
import time
5
 
6
import MySQLdb
7
from urlparse import urlparse
8
from urlparse import parse_qs
9
import requests
10
import json
11
import optparse
12
import urllib2
13
import base64
14
import urllib
15
import logging
16
from dtr.utils.utils import get_mongo_connection, to_java_date
17
from datetime import datetime
19655 manish.sha 18
import traceback
19095 manish.sha 19
 
20173 kshitij.so 20
parser = optparse.OptionParser()
21
parser.add_option("-C", "--chunksize", dest="chunksize",
22
                  default="100",
23
                  type="int", help="The requsets a single thread handles",
24
                  metavar="CHUNKSIZE")
25
parser.add_option("-M", "--mongoHost", dest="mongoHost",
26
                  default="localhost",
27
                  type="str", help="The requsets a single thread handles",
28
                  metavar="MONGOHOST")
29
parser.add_option("-m", "--mysqlHost", dest="mysqlHost",
30
                  default="localhost",
31
                  type="str", help="The requsets a single thread handles",
32
                  metavar="MONGOHOST")
33
 
34
(options, args) = parser.parse_args()
35
 
19095 manish.sha 36
GCM_URL = "https://android.googleapis.com/gcm/send"
20173 kshitij.so 37
GOOGLE_API_KEY = "AIzaSyBGL29sWCKm8OweP62ywrfv4jv4VIN8u0o"
19095 manish.sha 38
headers = {'content-type':'application/json', "authorization":"key=" + GOOGLE_API_KEY}
39
aff_url_headers = { 
40
            'User-agent':'Mozilla/5.0 (X11; Linux i686) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.120 Safari/537.36',
41
            'Accept' : 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',      
42
            'Accept-Language' : 'en-US,en;q=0.8',                     
43
            'Accept-Charset' : 'ISO-8859-1,utf-8;q=0.7,*;q=0.3',
44
            'Connection':'keep-alive'
45
        }
46
 
20173 kshitij.so 47
pendingNotificationSet = {'_id':1,'user_id':1,'gcm_id':1,'notification_campaign_id':1}
48
pendingCampaignSet = {'_id':1,'name':1,'title':1,'message':1,'type':1,'url':1,'sql':1,'expiresat':1} 
49
 
50
pendingNotificationEntryMap = {}
19095 manish.sha 51
campaignUsersMap = {}
52
notificationCampaignsMap = {}
53
userGcmRegIdMap = {}
20173 kshitij.so 54
gcmIdGcmRegIdMap = {}
55
domainStoresMap = {}
56
failedGcm = []
57
 
58
 
59
db = MySQLdb.connect(options.mysqlHost,"root","shop2020","dtr" )
19095 manish.sha 60
cursor = db.cursor()
61
 
62
ALL_STORES_SQL = "select * from stores"
19218 manish.sha 63
GCM_REG_ID_SQL1 = "select x.user_id, x.gcm_regid, x.id, x.androidid, x.created  from (select * from gcm_users where user_id in (%s) and androidid is not null order by id desc) as x group by x.user_id, x.gcm_regid, x.androidid"
19232 manish.sha 64
GCM_REG_ID_SQL2 = "select x.user_id, x.gcm_regid, x.id, x.androidid, x.created, x.imeinumber  from (select * from gcm_users where user_id in (%s) and androidid is null and imeinumber is not null order by id desc) as x group by x.user_id, x.imeinumber"
20173 kshitij.so 65
FETCH_GCM_REG_SQL = "select id, gcm_regid from gcm_users where id in (%s)"
19104 manish.sha 66
 
19095 manish.sha 67
cursor.execute(ALL_STORES_SQL)
68
result_stores = cursor.fetchall()
69
domainStoresMap = {}
70
for rec in result_stores:
71
    domainStoresMap[rec[2]] = rec
72
 
73
logging.basicConfig(level=logging.DEBUG,
74
                    format='[%(levelname)s] (%(threadName)-10s) %(message)s',
75
                    )
76
 
20173 kshitij.so 77
class __PushNotification:
78
    def __init__(self, notification_campaign_id, user_id, message, type, sms_type, sms_id, android_id, pushed_by, sent_timestamp, receive_timestamp, open_timestamp, sms_timestamp, gcm_id, created_timestamp, gcm_expired, notification_accounted):
79
        self.notification_campaign_id = notification_campaign_id
80
        self.user_id = user_id
81
        self.message = message
82
        self.type = type
83
        self.sms_type = sms_type
84
        self.sms_id = sms_id
85
        self.android_id = android_id
86
        self.pushed_by = pushed_by
87
        self.sent_timestamp = sent_timestamp
88
        self.receive_timestamp = receive_timestamp
89
        self.open_timestamp = open_timestamp
90
        self.sms_timestamp = sms_timestamp
91
        self.gcm_id = gcm_id
92
        self.created_timestamp = created_timestamp
93
        self.gcm_expired = gcm_expired
94
        self.notification_accounted = notification_accounted
95
 
96
 
97
 
98
def populatePendingNotificationEntriesToBeSent():
99
    global pendingNotificationEntryMap
100
    toRefetchGcmId = []
101
    offset = 0
102
    while(True):
103
        pendingNotificationEntries = list(get_mongo_connection(host=options.mongoHost).User.pushnotifications.find({'type':'pending'}, pendingNotificationSet).skip(offset).limit(10000))
104
        if not pendingNotificationEntries:
105
            break
106
        for entry in pendingNotificationEntries:
107
            gcm_regid = gcmIdGcmRegIdMap.get(entry.get('gcm_id'))
108
            if gcm_regid is None:
109
                toRefetchGcmId.append(entry.get('gcm_id'))
110
            if pendingNotificationEntryMap.has_key(entry['notification_campaign_id']):
111
                entries = pendingNotificationEntryMap.get(entry['notification_campaign_id'])
112
                entries.append(entry)
113
                pendingNotificationEntryMap[entry['notification_campaign_id']] = entries
114
            else:
115
                entries = []
116
                entries.append(entry)
117
                pendingNotificationEntryMap[entry['notification_campaign_id']] = entries
118
        offset = offset + 10000
119
    logging.debug("FETCH_GCM_REG_SQL:- "+FETCH_GCM_REG_SQL%(",".join(map(str,toRefetchGcmId))))
120
    if not toRefetchGcmId:
121
        print "No gcm to refetch"
122
        return
123
    cursor.execute(FETCH_GCM_REG_SQL%(",".join(map(str,toRefetchGcmId))))
124
    result_data = cursor.fetchall()
125
    if result_data:
126
        for data in result_data:
127
            gcmIdGcmRegIdMap[data[0]] = data[1]
128
 
129
 
130
 
131
def populateCampaignsMap(pendingCampaigns):
132
    global campaignUsersMap
133
    global notificationCampaignsMap
134
    for campaign in pendingCampaigns:
135
        cursor.execute(str(campaign['sql']))
136
        user_records = cursor.fetchall()
137
        userids = []
138
        for record in user_records:
139
            if str(record[0]) not in userids:
140
                userids.append(str(record[0]))
141
        campaignUsersMap[long(campaign['_id'])]=userids
142
        notificationCampaignsMap[long(campaign['_id'])] = campaign
143
 
144
def getPendingCampaigns():
145
    campaigns = list(get_mongo_connection(host=options.mongoHost).User.notificationcampaigns.find({'notification_processed':0,'expiresat':{'$gte':to_java_date(datetime.now())}},pendingCampaignSet))
146
    return campaigns
147
 
148
def insertPushNotificationEntriesToSent():
149
    global userGcmRegIdMap
150
    global gcmIdGcmRegIdMap
151
    for campaignId, user_list in campaignUsersMap.items():
152
        if len(user_list)==0:
153
            continue
154
        logging.debug("GCM_REG_SQL_1:- "+GCM_REG_ID_SQL1%(",".join(map(str,user_list))))
155
        cursor.execute(GCM_REG_ID_SQL1%(",".join(map(str,user_list))))
156
        result_data = cursor.fetchall()
157
 
158
        if result_data and len(result_data)>0:
159
 
160
            for dataRec in result_data:
161
 
162
                gcmIdGcmRegIdMap[dataRec[2]] = dataRec[1]
163
 
164
                if userGcmRegIdMap.has_key(dataRec[0]):
165
                    gcmRegIdMap = userGcmRegIdMap.get(dataRec[0])
166
                    gcmRegIdMap[dataRec[2]]= {'gcm_regid':dataRec[1],'android_id':dataRec[3]}
167
                    userGcmRegIdMap[dataRec[0]] = gcmRegIdMap
168
                else:
169
                    userGcmRegIdMap[dataRec[0]] = {dataRec[2]:{'gcm_regid':dataRec[1],'android_id':dataRec[3]}}
170
 
171
        logging.debug("GCM_REG_SQL_2:- "+GCM_REG_ID_SQL2%(",".join(map(str,user_list))))       
172
        cursor.execute(GCM_REG_ID_SQL2%(",".join(map(str,user_list))))
173
        result_data = cursor.fetchall()
174
        for dataRec in result_data:
175
 
176
            gcmIdGcmRegIdMap[dataRec[2]] = dataRec[1]
177
 
178
            if userGcmRegIdMap.has_key(dataRec[0]):
179
                gcmRegIdMap = userGcmRegIdMap.get(dataRec[0])
180
                gcmRegIdMap[dataRec[2]]= {'gcm_regid':dataRec[1],'android_id':dataRec[3]}
181
                userGcmRegIdMap[dataRec[0]] = gcmRegIdMap
182
            else:
183
                userGcmRegIdMap[dataRec[0]] = {dataRec[2]:{'gcm_regid':dataRec[1],'android_id':None}}
184
 
185
 
186
    for campaignId, userList in campaignUsersMap.items():
187
        bulkArrayPushNotificationObj = []
188
        bulkCount = 0
189
        for userId in userList:
190
            gcmRegIdMap = userGcmRegIdMap.get(long(userId))
191
            if gcmRegIdMap is None:
192
                logging.debug("No value exist in gcm map for user_id"+str(userId))
193
                continue
194
            for gcm_id, details in gcmRegIdMap.items():
195
                android_id = None
196
                if details.has_key('android_id'):
197
                    android_id = details['android_id']
198
                pushNotificationObj = __PushNotification(long(campaignId), long(userId), None, 'pending', \
199
                                    None, None, android_id, "php", None, None, None, None, gcm_id, to_java_date(datetime.now()),0,0)
200
                bulkArrayPushNotificationObj.append(pushNotificationObj.__dict__)
201
                bulkCount = bulkCount + 1
202
            if bulkCount >= 10000:
203
                get_mongo_connection(host=options.mongoHost).User.pushnotifications.insert(bulkArrayPushNotificationObj)
204
                bulkCount = 0
205
                bulkArrayPushNotificationObj = []
206
 
207
        if bulkArrayPushNotificationObj:
208
            get_mongo_connection(host=options.mongoHost).User.pushnotifications.insert(bulkArrayPushNotificationObj)
209
 
210
def initiateNotificationThreadProcess(chunkSize):
211
    count = 1
212
    logging.debug('Starting Push Notification Job....'+str(datetime.now()))
213
    for entries in pendingNotificationEntryMap.values():
214
        campaign_receivers_list = list(chunks(entries, chunkSize))
215
        for sublist in campaign_receivers_list:
216
            thread = NotificationThread(count, "Thread-"+str(count), sublist)
217
            thread.start()
218
            thread.join()
219
            count = count +1
220
    logging.debug('Stopping Push Notification Job....'+str(datetime.now()))
221
 
19095 manish.sha 222
class NotificationThread (threading.Thread):
223
    def __init__(self, threadID, name, recordsList):
224
        threading.Thread.__init__(self)
225
        self.threadID = threadID
226
        self.name = name
227
        self.recordsList = recordsList
20173 kshitij.so 228
 
19095 manish.sha 229
    def run(self):
230
        logging.debug('Starting')
231
        handleCampaignRequest(self.name, self.recordsList)
232
        logging.debug('Completed')
233
 
234
def handleCampaignRequest(threadName, recordsList ):
20173 kshitij.so 235
    global failedGcm
19095 manish.sha 236
    for record in recordsList:
19655 manish.sha 237
        try:
20173 kshitij.so 238
            #userGcmRegIdDetails = userGcmRegIdMap.get(long(record.get('user_id')))
19655 manish.sha 239
            campaign = notificationCampaignsMap.get(long(record.get('notification_campaign_id'))) 
240
            gcm_id = record.get('gcm_id')
20173 kshitij.so 241
            gcm_regid = gcmIdGcmRegIdMap.get(gcm_id)
242
            if gcm_regid is None:
243
                print "Ideally should never raise"
244
                raise
19655 manish.sha 245
        except:
246
            logging.debug('Error while getting GCM Details for User Id:- '+ str(record.get('user_id'))+" and Notification Id:- "+str(record.get('notification_campaign_id')))
247
            traceback.print_exc() 
248
            continue
19134 manish.sha 249
        result_url = ""
250
        if campaign.get('type')=='url':
251
            parsed_uri = urlparse(campaign.get('url'))
252
            domain = '{uri.netloc}'.format(uri=parsed_uri)
253
            store = domainStoresMap.get(domain)
254
            if store is not None:
255
                url_params = { 'url' : campaign.get('url'),  'userId' : record.get('user_id'), 'storeId' : store[0] }
256
                encoded_url_params = urllib.urlencode(url_params)
257
                DTR_API_BASIC_AUTH = base64.encodestring('%s:%s' % ("dtr", "dtr18Feb2015")).replace('\n', '')
258
 
259
                pushpostrequest = urllib2.Request('http://api.profittill.com/pushnotifications/generateAffiliateUrl', encoded_url_params, headers=aff_url_headers)
260
                pushpostrequest.add_header("Authorization", "Basic %s" % DTR_API_BASIC_AUTH)
261
                json_result =  json.loads(urllib2.urlopen(pushpostrequest).read())
262
                result_url = json_result['url']
263
            else:
264
                queryString = urlparse(campaign.get('url').strip()).query
265
                parsed_url = parse_qs(queryString)
266
                if not parsed_url.has_key('user_id'):
267
                    if len(queryString)>0:
268
                        result_url = campaign.get('url').strip()+'&user_id='+str(record.get('user_id'))
269
                    else:
270
                        result_url = campaign.get('url').strip()+'?user_id='+str(record.get('user_id'))
19095 manish.sha 271
                else:
20173 kshitij.so 272
                    result_url = campaign.get('url').strip()
19134 manish.sha 273
        if campaign.get('url') is None or str(campaign.get('url'))=='':
274
            result_url = 'http://api.profittill.com/deals?user_id='+str(record.get('user_id'))
275
        data = {"message":campaign.get('message'),"cid":str(campaign.get('_id'))+"_"+str(record.get('_id')),"title":campaign.get('title'),
276
                "type":campaign.get('type'),"url":result_url.strip(),"vibrate":1,"sound":1,"largeIcon":"large_icon",
277
                "smallIcon":"small_icon","priority":"high","time_to_live":long(campaign.get('expiresat'))-long(time.mktime(datetime.now().timetuple()))}
278
 
279
        post_data = {}
280
 
281
        post_data['data'] = data
282
        regIds = []
20173 kshitij.so 283
        regIds.append(gcm_regid)
19134 manish.sha 284
        post_data['registration_ids'] = regIds
285
 
286
        post_data_json = json.dumps(post_data)
287
        logging.debug('User Id:- '+str(record.get('user_id'))+' Post Data Json :- '+str(post_data_json))
288
        response = requests.post(GCM_URL, data=post_data_json, headers=headers)
289
        logging.debug('User Id:- '+str(record.get('user_id'))+' GCM_ID:- '+str(gcm_id)+' Response :-'+str(response.text))
290
        result = json.loads(response.text)
291
        if result["success"]:
20173 kshitij.so 292
            get_mongo_connection(host=options.mongoHost).User.pushnotifications.update({'_id':record.get('_id')},{"$set":{'message':'success','type':'sent','sent_timestamp':to_java_date(datetime.now())}})
19134 manish.sha 293
        else:
20173 kshitij.so 294
            get_mongo_connection(host=options.mongoHost).User.pushnotifications.update({'_id':record.get('_id')},{"$set":{'message':result["results"][0]["error"],'type':'failed','sent_timestamp':to_java_date(datetime.now())}})
295
            failedGcm.append(gcm_id)
296
#             updateGcmUserSql = "update gcm_users set failurecount=failurecount+1 where gcm_regid='%s'"%(gcm_regid)
297
#             logging.debug('Update GCM User Query :-'+str(updateGcmUserSql))
298
#             try:
299
#                 dtrdb = MySQLdb.connect('localhost',"root","shop2020","dtr" )
300
#                 cursor = dtrdb.cursor()
301
#                 cursor.execute(updateGcmUserSql)
302
#                 dtrdb.commit()
303
#                 session.commit()
304
#                 dtrdb.close()
305
#             except:
306
#                 dtrdb.rollback()
307
#                 dtrdb.close()
19095 manish.sha 308
 
309
def chunks(l, n):
310
    """Yield successive n-sized chunks from l."""
311
    for i in xrange(0, len(l), n):
312
        yield l[i:i+n]
313
 
19104 manish.sha 314
def markNotificationCampaignsProcessed():
315
    for campaign in notificationCampaignsMap.values():
19125 manish.sha 316
        logging.debug('Notification Campaign....'+str(campaign.get('_id'))+"...Marked Processed. "+str(datetime.now()))
20173 kshitij.so 317
        get_mongo_connection(host=options.mongoHost).User.notificationcampaigns.update({'_id':campaign.get('_id')},{"$set":{'notification_processed':1}})
318
 
319
def updateFailedGcm():
320
    updateGcmUserSql = "update gcm_users set failurecount=failurecount+1 where id in (%s)"
321
    logging.debug("updateGcmUserSql:- "+updateGcmUserSql%(",".join(map(str,failedGcm))))
322
    if not failedGcm:
323
        return
324
    cursor.execute(updateGcmUserSql%(",".join(map(str,failedGcm))))
19104 manish.sha 325
 
20173 kshitij.so 326
    try:
327
        db.commit()
328
    except:
329
        traceback.print_exc()
330
        db.rollback()
331
 
332
 
19095 manish.sha 333
def main():
20173 kshitij.so 334
    try:
335
        pendingCampaigns = getPendingCampaigns()
336
        populateCampaignsMap(pendingCampaigns)
337
        insertPushNotificationEntriesToSent()
338
        markNotificationCampaignsProcessed()
339
        populatePendingNotificationEntriesToBeSent()
340
        initiateNotificationThreadProcess(options.chunksize)
341
        updateFailedGcm()
342
    except:
343
        traceback.print_exc()
344
    finally:
345
        db.close()
19095 manish.sha 346
 
20173 kshitij.so 347
 
348
if __name__ == '__main__':
19095 manish.sha 349
    main()
350