Subversion Repositories SmartDukaan

Rev

Rev 19655 | Rev 20775 | Go to most recent revision | Show entire file | Ignore whitespace | Details | Blame | Last modification | View Log | RSS feed

Rev 19655 Rev 20173
Line 2... Line 2...
2
 
2
 
3
import threading
3
import threading
4
import time
4
import time
5
 
5
 
6
import MySQLdb
6
import MySQLdb
7
from elixir import *
-
 
8
from sqlalchemy.orm.exc import NoResultFound, MultipleResultsFound
-
 
9
from sqlalchemy.sql import func
-
 
10
from sqlalchemy.sql.expression import and_, or_, desc, not_, distinct, cast, \
-
 
11
    between
-
 
12
from urlparse import urlparse
7
from urlparse import urlparse
13
from urlparse import parse_qs
8
from urlparse import parse_qs
14
import requests
9
import requests
15
import json
10
import json
16
import optparse
11
import optparse
Line 20... Line 15...
20
import logging
15
import logging
21
from dtr.utils.utils import get_mongo_connection, to_java_date
16
from dtr.utils.utils import get_mongo_connection, to_java_date
22
from datetime import datetime
17
from datetime import datetime
23
import traceback
18
import traceback
24
 
19
 
-
 
20
parser = optparse.OptionParser()
-
 
21
parser.add_option("-C", "--chunksize", dest="chunksize",
-
 
22
                  default="100",
-
 
23
                  type="int", help="The requsets a single thread handles",
-
 
24
                  metavar="CHUNKSIZE")
-
 
25
parser.add_option("-M", "--mongoHost", dest="mongoHost",
-
 
26
                  default="localhost",
-
 
27
                  type="str", help="The requsets a single thread handles",
-
 
28
                  metavar="MONGOHOST")
-
 
29
parser.add_option("-m", "--mysqlHost", dest="mysqlHost",
-
 
30
                  default="localhost",
-
 
31
                  type="str", help="The requsets a single thread handles",
-
 
32
                  metavar="MONGOHOST")
-
 
33
 
-
 
34
(options, args) = parser.parse_args()
-
 
35
 
25
GCM_URL = "https://android.googleapis.com/gcm/send"
36
GCM_URL = "https://android.googleapis.com/gcm/send"
26
GOOGLE_API_KEY = "AIzaSyDw1qBnmxtnfR9NqBewryQ-yo3cG2ravGM"
37
GOOGLE_API_KEY = "AIzaSyBGL29sWCKm8OweP62ywrfv4jv4VIN8u0o"
27
headers = {'content-type':'application/json', "authorization":"key=" + GOOGLE_API_KEY}
38
headers = {'content-type':'application/json', "authorization":"key=" + GOOGLE_API_KEY}
28
aff_url_headers = { 
39
aff_url_headers = { 
29
            'User-agent':'Mozilla/5.0 (X11; Linux i686) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.120 Safari/537.36',
40
            'User-agent':'Mozilla/5.0 (X11; Linux i686) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.120 Safari/537.36',
30
            'Accept' : 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',      
41
            'Accept' : 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',      
31
            'Accept-Language' : 'en-US,en;q=0.8',                     
42
            'Accept-Language' : 'en-US,en;q=0.8',                     
32
            'Accept-Charset' : 'ISO-8859-1,utf-8;q=0.7,*;q=0.3',
43
            'Accept-Charset' : 'ISO-8859-1,utf-8;q=0.7,*;q=0.3',
33
            'Connection':'keep-alive'
44
            'Connection':'keep-alive'
34
        }
45
        }
35
 
46
 
-
 
47
pendingNotificationSet = {'_id':1,'user_id':1,'gcm_id':1,'notification_campaign_id':1}
-
 
48
pendingCampaignSet = {'_id':1,'name':1,'title':1,'message':1,'type':1,'url':1,'sql':1,'expiresat':1} 
-
 
49
 
36
mongoHost = 'localhost' 
50
pendingNotificationEntryMap = {}
37
campaignUsersMap = {}
51
campaignUsersMap = {}
38
notificationCampaignsMap = {}
52
notificationCampaignsMap = {}
39
pendingNotificationEntryMap = {}
-
 
40
userGcmRegIdMap = {}
53
userGcmRegIdMap = {}
-
 
54
gcmIdGcmRegIdMap = {}
-
 
55
domainStoresMap = {}
-
 
56
failedGcm = []
-
 
57
 
-
 
58
 
41
db = MySQLdb.connect('localhost',"root","shop2020","dtr" )
59
db = MySQLdb.connect(options.mysqlHost,"root","shop2020","dtr" )
42
cursor = db.cursor()
60
cursor = db.cursor()
43
 
61
 
44
ALL_STORES_SQL = "select * from stores"
62
ALL_STORES_SQL = "select * from stores"
45
GCM_REG_ID_SQL1 = "select x.user_id, x.gcm_regid, x.id, x.androidid, x.created  from (select * from gcm_users where user_id in (%s) and androidid is not null order by id desc) as x group by x.user_id, x.gcm_regid, x.androidid"
63
GCM_REG_ID_SQL1 = "select x.user_id, x.gcm_regid, x.id, x.androidid, x.created  from (select * from gcm_users where user_id in (%s) and androidid is not null order by id desc) as x group by x.user_id, x.gcm_regid, x.androidid"
46
GCM_REG_ID_SQL2 = "select x.user_id, x.gcm_regid, x.id, x.androidid, x.created, x.imeinumber  from (select * from gcm_users where user_id in (%s) and androidid is null and imeinumber is not null order by id desc) as x group by x.user_id, x.imeinumber"
64
GCM_REG_ID_SQL2 = "select x.user_id, x.gcm_regid, x.id, x.androidid, x.created, x.imeinumber  from (select * from gcm_users where user_id in (%s) and androidid is null and imeinumber is not null order by id desc) as x group by x.user_id, x.imeinumber"
-
 
65
FETCH_GCM_REG_SQL = "select id, gcm_regid from gcm_users where id in (%s)"
47
 
66
 
48
cursor.execute(ALL_STORES_SQL)
67
cursor.execute(ALL_STORES_SQL)
49
result_stores = cursor.fetchall()
68
result_stores = cursor.fetchall()
50
domainStoresMap = {}
69
domainStoresMap = {}
51
for rec in result_stores:
70
for rec in result_stores:
Line 53... Line 72...
53
    
72
    
54
logging.basicConfig(level=logging.DEBUG,
73
logging.basicConfig(level=logging.DEBUG,
55
                    format='[%(levelname)s] (%(threadName)-10s) %(message)s',
74
                    format='[%(levelname)s] (%(threadName)-10s) %(message)s',
56
                    )
75
                    )
57
 
76
 
-
 
77
class __PushNotification:
-
 
78
    def __init__(self, notification_campaign_id, user_id, message, type, sms_type, sms_id, android_id, pushed_by, sent_timestamp, receive_timestamp, open_timestamp, sms_timestamp, gcm_id, created_timestamp, gcm_expired, notification_accounted):
-
 
79
        self.notification_campaign_id = notification_campaign_id
-
 
80
        self.user_id = user_id
-
 
81
        self.message = message
-
 
82
        self.type = type
-
 
83
        self.sms_type = sms_type
-
 
84
        self.sms_id = sms_id
-
 
85
        self.android_id = android_id
-
 
86
        self.pushed_by = pushed_by
-
 
87
        self.sent_timestamp = sent_timestamp
-
 
88
        self.receive_timestamp = receive_timestamp
-
 
89
        self.open_timestamp = open_timestamp
-
 
90
        self.sms_timestamp = sms_timestamp
-
 
91
        self.gcm_id = gcm_id
-
 
92
        self.created_timestamp = created_timestamp
-
 
93
        self.gcm_expired = gcm_expired
-
 
94
        self.notification_accounted = notification_accounted
-
 
95
 
-
 
96
 
-
 
97
 
-
 
98
def populatePendingNotificationEntriesToBeSent():
-
 
99
    global pendingNotificationEntryMap
-
 
100
    toRefetchGcmId = []
-
 
101
    offset = 0
-
 
102
    while(True):
-
 
103
        pendingNotificationEntries = list(get_mongo_connection(host=options.mongoHost).User.pushnotifications.find({'type':'pending'}, pendingNotificationSet).skip(offset).limit(10000))
-
 
104
        if not pendingNotificationEntries:
-
 
105
            break
-
 
106
        for entry in pendingNotificationEntries:
-
 
107
            gcm_regid = gcmIdGcmRegIdMap.get(entry.get('gcm_id'))
-
 
108
            if gcm_regid is None:
-
 
109
                toRefetchGcmId.append(entry.get('gcm_id'))
-
 
110
            if pendingNotificationEntryMap.has_key(entry['notification_campaign_id']):
-
 
111
                entries = pendingNotificationEntryMap.get(entry['notification_campaign_id'])
-
 
112
                entries.append(entry)
-
 
113
                pendingNotificationEntryMap[entry['notification_campaign_id']] = entries
-
 
114
            else:
-
 
115
                entries = []
-
 
116
                entries.append(entry)
-
 
117
                pendingNotificationEntryMap[entry['notification_campaign_id']] = entries
-
 
118
        offset = offset + 10000
-
 
119
    logging.debug("FETCH_GCM_REG_SQL:- "+FETCH_GCM_REG_SQL%(",".join(map(str,toRefetchGcmId))))
-
 
120
    if not toRefetchGcmId:
-
 
121
        print "No gcm to refetch"
-
 
122
        return
-
 
123
    cursor.execute(FETCH_GCM_REG_SQL%(",".join(map(str,toRefetchGcmId))))
-
 
124
    result_data = cursor.fetchall()
-
 
125
    if result_data:
-
 
126
        for data in result_data:
-
 
127
            gcmIdGcmRegIdMap[data[0]] = data[1]
-
 
128
    
-
 
129
    
-
 
130
 
-
 
131
def populateCampaignsMap(pendingCampaigns):
-
 
132
    global campaignUsersMap
-
 
133
    global notificationCampaignsMap
-
 
134
    for campaign in pendingCampaigns:
-
 
135
        cursor.execute(str(campaign['sql']))
-
 
136
        user_records = cursor.fetchall()
-
 
137
        userids = []
-
 
138
        for record in user_records:
-
 
139
            if str(record[0]) not in userids:
-
 
140
                userids.append(str(record[0]))
-
 
141
        campaignUsersMap[long(campaign['_id'])]=userids
-
 
142
        notificationCampaignsMap[long(campaign['_id'])] = campaign
-
 
143
    
-
 
144
def getPendingCampaigns():
-
 
145
    campaigns = list(get_mongo_connection(host=options.mongoHost).User.notificationcampaigns.find({'notification_processed':0,'expiresat':{'$gte':to_java_date(datetime.now())}},pendingCampaignSet))
-
 
146
    return campaigns
-
 
147
 
-
 
148
def insertPushNotificationEntriesToSent():
-
 
149
    global userGcmRegIdMap
-
 
150
    global gcmIdGcmRegIdMap
-
 
151
    for campaignId, user_list in campaignUsersMap.items():
-
 
152
        if len(user_list)==0:
-
 
153
            continue
-
 
154
        logging.debug("GCM_REG_SQL_1:- "+GCM_REG_ID_SQL1%(",".join(map(str,user_list))))
-
 
155
        cursor.execute(GCM_REG_ID_SQL1%(",".join(map(str,user_list))))
-
 
156
        result_data = cursor.fetchall()
-
 
157
        
-
 
158
        if result_data and len(result_data)>0:
-
 
159
                
-
 
160
            for dataRec in result_data:
-
 
161
                
-
 
162
                gcmIdGcmRegIdMap[dataRec[2]] = dataRec[1]
-
 
163
                
-
 
164
                if userGcmRegIdMap.has_key(dataRec[0]):
-
 
165
                    gcmRegIdMap = userGcmRegIdMap.get(dataRec[0])
-
 
166
                    gcmRegIdMap[dataRec[2]]= {'gcm_regid':dataRec[1],'android_id':dataRec[3]}
-
 
167
                    userGcmRegIdMap[dataRec[0]] = gcmRegIdMap
-
 
168
                else:
-
 
169
                    userGcmRegIdMap[dataRec[0]] = {dataRec[2]:{'gcm_regid':dataRec[1],'android_id':dataRec[3]}}
-
 
170
            
-
 
171
        logging.debug("GCM_REG_SQL_2:- "+GCM_REG_ID_SQL2%(",".join(map(str,user_list))))       
-
 
172
        cursor.execute(GCM_REG_ID_SQL2%(",".join(map(str,user_list))))
-
 
173
        result_data = cursor.fetchall()
-
 
174
        for dataRec in result_data:
-
 
175
            
-
 
176
            gcmIdGcmRegIdMap[dataRec[2]] = dataRec[1]
-
 
177
            
-
 
178
            if userGcmRegIdMap.has_key(dataRec[0]):
-
 
179
                gcmRegIdMap = userGcmRegIdMap.get(dataRec[0])
-
 
180
                gcmRegIdMap[dataRec[2]]= {'gcm_regid':dataRec[1],'android_id':dataRec[3]}
-
 
181
                userGcmRegIdMap[dataRec[0]] = gcmRegIdMap
-
 
182
            else:
-
 
183
                userGcmRegIdMap[dataRec[0]] = {dataRec[2]:{'gcm_regid':dataRec[1],'android_id':None}}
-
 
184
                    
-
 
185
        
-
 
186
    for campaignId, userList in campaignUsersMap.items():
-
 
187
        bulkArrayPushNotificationObj = []
-
 
188
        bulkCount = 0
-
 
189
        for userId in userList:
-
 
190
            gcmRegIdMap = userGcmRegIdMap.get(long(userId))
-
 
191
            if gcmRegIdMap is None:
-
 
192
                logging.debug("No value exist in gcm map for user_id"+str(userId))
-
 
193
                continue
-
 
194
            for gcm_id, details in gcmRegIdMap.items():
-
 
195
                android_id = None
-
 
196
                if details.has_key('android_id'):
-
 
197
                    android_id = details['android_id']
-
 
198
                pushNotificationObj = __PushNotification(long(campaignId), long(userId), None, 'pending', \
-
 
199
                                    None, None, android_id, "php", None, None, None, None, gcm_id, to_java_date(datetime.now()),0,0)
-
 
200
                bulkArrayPushNotificationObj.append(pushNotificationObj.__dict__)
-
 
201
                bulkCount = bulkCount + 1
-
 
202
            if bulkCount >= 10000:
-
 
203
                get_mongo_connection(host=options.mongoHost).User.pushnotifications.insert(bulkArrayPushNotificationObj)
-
 
204
                bulkCount = 0
-
 
205
                bulkArrayPushNotificationObj = []
-
 
206
                
-
 
207
        if bulkArrayPushNotificationObj:
-
 
208
            get_mongo_connection(host=options.mongoHost).User.pushnotifications.insert(bulkArrayPushNotificationObj)
-
 
209
 
-
 
210
def initiateNotificationThreadProcess(chunkSize):
-
 
211
    count = 1
-
 
212
    logging.debug('Starting Push Notification Job....'+str(datetime.now()))
-
 
213
    for entries in pendingNotificationEntryMap.values():
-
 
214
        campaign_receivers_list = list(chunks(entries, chunkSize))
-
 
215
        for sublist in campaign_receivers_list:
-
 
216
            thread = NotificationThread(count, "Thread-"+str(count), sublist)
-
 
217
            thread.start()
-
 
218
            thread.join()
-
 
219
            count = count +1
-
 
220
    logging.debug('Stopping Push Notification Job....'+str(datetime.now()))
-
 
221
 
58
class NotificationThread (threading.Thread):
222
class NotificationThread (threading.Thread):
59
    def __init__(self, threadID, name, recordsList):
223
    def __init__(self, threadID, name, recordsList):
60
        threading.Thread.__init__(self)
224
        threading.Thread.__init__(self)
61
        self.threadID = threadID
225
        self.threadID = threadID
62
        self.name = name
226
        self.name = name
63
        self.recordsList = recordsList
227
        self.recordsList = recordsList
-
 
228
        
64
    def run(self):
229
    def run(self):
65
        logging.debug('Starting')
230
        logging.debug('Starting')
66
        handleCampaignRequest(self.name, self.recordsList)
231
        handleCampaignRequest(self.name, self.recordsList)
67
        logging.debug('Completed')
232
        logging.debug('Completed')
68
 
233
 
69
def handleCampaignRequest(threadName, recordsList ):
234
def handleCampaignRequest(threadName, recordsList ):
-
 
235
    global failedGcm
70
    for record in recordsList:
236
    for record in recordsList:
71
        try:
237
        try:
72
            userGcmRegIdDetails = userGcmRegIdMap.get(long(record.get('user_id')))
238
            #userGcmRegIdDetails = userGcmRegIdMap.get(long(record.get('user_id')))
73
            campaign = notificationCampaignsMap.get(long(record.get('notification_campaign_id'))) 
239
            campaign = notificationCampaignsMap.get(long(record.get('notification_campaign_id'))) 
74
            gcm_id = record.get('gcm_id')
240
            gcm_id = record.get('gcm_id')
75
            detailsMap = userGcmRegIdDetails.get(gcm_id)
241
            gcm_regid = gcmIdGcmRegIdMap.get(gcm_id)
-
 
242
            if gcm_regid is None:
-
 
243
                print "Ideally should never raise"
-
 
244
                raise
76
        except:
245
        except:
77
            logging.debug('Error while getting GCM Details for User Id:- '+ str(record.get('user_id'))+" and Notification Id:- "+str(record.get('notification_campaign_id')))
246
            logging.debug('Error while getting GCM Details for User Id:- '+ str(record.get('user_id'))+" and Notification Id:- "+str(record.get('notification_campaign_id')))
78
            traceback.print_exc() 
247
            traceback.print_exc() 
79
            continue
248
            continue
80
        result_url = ""
249
        result_url = ""
81
        if campaign.get('type')=='url':
250
        if campaign.get('type')=='url':
82
            parsed_uri = urlparse(campaign.get('url'))
251
            parsed_uri = urlparse(campaign.get('url'))
83
            domain = '{uri.netloc}'.format(uri=parsed_uri)
252
            domain = '{uri.netloc}'.format(uri=parsed_uri)
84
            logging.debug('Affiliate Domain:-'+str(domain))
-
 
85
            logging.debug('User Id:-'+str(record.get('user_id'))+' And GCM Reg Id:- '+ str(detailsMap.get('gcm_regid')))
-
 
86
            store = domainStoresMap.get(domain)
253
            store = domainStoresMap.get(domain)
87
            if store is not None:
254
            if store is not None:
88
                url_params = { 'url' : campaign.get('url'),  'userId' : record.get('user_id'), 'storeId' : store[0] }
255
                url_params = { 'url' : campaign.get('url'),  'userId' : record.get('user_id'), 'storeId' : store[0] }
89
                encoded_url_params = urllib.urlencode(url_params)
256
                encoded_url_params = urllib.urlencode(url_params)
90
                DTR_API_BASIC_AUTH = base64.encodestring('%s:%s' % ("dtr", "dtr18Feb2015")).replace('\n', '')
257
                DTR_API_BASIC_AUTH = base64.encodestring('%s:%s' % ("dtr", "dtr18Feb2015")).replace('\n', '')
91
                
258
                
92
                pushpostrequest = urllib2.Request('http://api.profittill.com/pushnotifications/generateAffiliateUrl', encoded_url_params, headers=aff_url_headers)
259
                pushpostrequest = urllib2.Request('http://api.profittill.com/pushnotifications/generateAffiliateUrl', encoded_url_params, headers=aff_url_headers)
93
                pushpostrequest.add_header("Authorization", "Basic %s" % DTR_API_BASIC_AUTH)
260
                pushpostrequest.add_header("Authorization", "Basic %s" % DTR_API_BASIC_AUTH)
94
                json_result =  json.loads(urllib2.urlopen(pushpostrequest).read())
261
                json_result =  json.loads(urllib2.urlopen(pushpostrequest).read())
95
                result_url = json_result['url']
262
                result_url = json_result['url']
96
                logging.debug('User Id:-'+str(record.get('user_id'))+' Notification Url:- '+ str(result_url))
-
 
97
            else:
263
            else:
98
                queryString = urlparse(campaign.get('url').strip()).query
264
                queryString = urlparse(campaign.get('url').strip()).query
99
                parsed_url = parse_qs(queryString)
265
                parsed_url = parse_qs(queryString)
100
                if not parsed_url.has_key('user_id'):
266
                if not parsed_url.has_key('user_id'):
101
                    if len(queryString)>0:
267
                    if len(queryString)>0:
102
                        result_url = campaign.get('url').strip()+'&user_id='+str(record.get('user_id'))
268
                        result_url = campaign.get('url').strip()+'&user_id='+str(record.get('user_id'))
103
                        logging.debug('User Id:-'+str(record.get('user_id'))+' Notification Url:- '+ str(result_url))
-
 
104
                    else:
269
                    else:
105
                        result_url = campaign.get('url').strip()+'?user_id='+str(record.get('user_id'))
270
                        result_url = campaign.get('url').strip()+'?user_id='+str(record.get('user_id'))
106
                        logging.debug('User Id:-'+str(record.get('user_id'))+' Notification Url:- '+ str(result_url))
-
 
107
                else:
271
                else:
108
                    logging.debug('User Id:-'+str(record.get('user_id'))+' Notification Url:- '+ str(record.get('user_id')))
272
                    result_url = campaign.get('url').strip()
109
        if campaign.get('url') is None or str(campaign.get('url'))=='':
273
        if campaign.get('url') is None or str(campaign.get('url'))=='':
110
            result_url = 'http://api.profittill.com/deals?user_id='+str(record.get('user_id'))
274
            result_url = 'http://api.profittill.com/deals?user_id='+str(record.get('user_id'))
111
        data = {"message":campaign.get('message'),"cid":str(campaign.get('_id'))+"_"+str(record.get('_id')),"title":campaign.get('title'),
275
        data = {"message":campaign.get('message'),"cid":str(campaign.get('_id'))+"_"+str(record.get('_id')),"title":campaign.get('title'),
112
                "type":campaign.get('type'),"url":result_url.strip(),"vibrate":1,"sound":1,"largeIcon":"large_icon",
276
                "type":campaign.get('type'),"url":result_url.strip(),"vibrate":1,"sound":1,"largeIcon":"large_icon",
113
                "smallIcon":"small_icon","priority":"high","time_to_live":long(campaign.get('expiresat'))-long(time.mktime(datetime.now().timetuple()))}
277
                "smallIcon":"small_icon","priority":"high","time_to_live":long(campaign.get('expiresat'))-long(time.mktime(datetime.now().timetuple()))}
114
        
278
        
115
        post_data = {}
279
        post_data = {}
116
        
280
        
117
        post_data['data'] = data
281
        post_data['data'] = data
118
        regIds = []
282
        regIds = []
119
        regIds.append(detailsMap.get('gcm_regid'))
283
        regIds.append(gcm_regid)
120
        post_data['registration_ids'] = regIds
284
        post_data['registration_ids'] = regIds
121
         
285
         
122
        post_data_json = json.dumps(post_data)
286
        post_data_json = json.dumps(post_data)
123
        logging.debug('User Id:- '+str(record.get('user_id'))+' Post Data Json :- '+str(post_data_json))
287
        logging.debug('User Id:- '+str(record.get('user_id'))+' Post Data Json :- '+str(post_data_json))
124
        response = requests.post(GCM_URL, data=post_data_json, headers=headers)
288
        response = requests.post(GCM_URL, data=post_data_json, headers=headers)
125
        logging.debug('User Id:- '+str(record.get('user_id'))+' GCM_ID:- '+str(gcm_id)+' Response :-'+str(response.text))
289
        logging.debug('User Id:- '+str(record.get('user_id'))+' GCM_ID:- '+str(gcm_id)+' Response :-'+str(response.text))
126
        result = json.loads(response.text)
290
        result = json.loads(response.text)
127
        if result["success"]:
291
        if result["success"]:
128
            get_mongo_connection(host=mongoHost).User.pushnotifications.update({'_id':record.get('_id')},{"$set":{'message':'success','type':'sent','sent_timestamp':to_java_date(datetime.now())}})
292
            get_mongo_connection(host=options.mongoHost).User.pushnotifications.update({'_id':record.get('_id')},{"$set":{'message':'success','type':'sent','sent_timestamp':to_java_date(datetime.now())}})
129
            logging.debug('User Id:- '+str(record.get('user_id'))+' GCM_ID:- '+str(gcm_id)+' Update Response :- Notification Success True')
-
 
130
        else:
293
        else:
131
            get_mongo_connection(host=mongoHost).User.pushnotifications.update({'_id':record.get('_id')},{"$set":{'message':result["results"][0]["error"],'type':'failed','sent_timestamp':to_java_date(datetime.now())}})
294
            get_mongo_connection(host=options.mongoHost).User.pushnotifications.update({'_id':record.get('_id')},{"$set":{'message':result["results"][0]["error"],'type':'failed','sent_timestamp':to_java_date(datetime.now())}})
132
            logging.debug('User Id:- '+str(record.get('user_id'))+' GCM_ID:- '+str(gcm_id)+' Update Response :- Notification Fail True')
295
            failedGcm.append(gcm_id)
133
            updateGcmUserSql = "update gcm_users set failurecount=failurecount+1 where gcm_regid='%s'"%(detailsMap.get('gcm_regid'))
296
#             updateGcmUserSql = "update gcm_users set failurecount=failurecount+1 where gcm_regid='%s'"%(gcm_regid)
134
            logging.debug('Update GCM User Query :-'+str(updateGcmUserSql))
297
#             logging.debug('Update GCM User Query :-'+str(updateGcmUserSql))
135
            try:
298
#             try:
136
                dtrdb = MySQLdb.connect('localhost',"root","shop2020","dtr" )
299
#                 dtrdb = MySQLdb.connect('localhost',"root","shop2020","dtr" )
137
                cursor = dtrdb.cursor()
300
#                 cursor = dtrdb.cursor()
138
                cursor.execute(updateGcmUserSql)
301
#                 cursor.execute(updateGcmUserSql)
139
                dtrdb.commit()
302
#                 dtrdb.commit()
140
                session.commit()
303
#                 session.commit()
141
                dtrdb.close()
304
#                 dtrdb.close()
142
            except:
305
#             except:
143
                dtrdb.rollback()
306
#                 dtrdb.rollback()
144
                dtrdb.close()
307
#                 dtrdb.close()
145
 
308
 
146
def chunks(l, n):
309
def chunks(l, n):
147
    """Yield successive n-sized chunks from l."""
310
    """Yield successive n-sized chunks from l."""
148
    for i in xrange(0, len(l), n):
311
    for i in xrange(0, len(l), n):
149
        yield l[i:i+n]
312
        yield l[i:i+n]
150
 
313
 
151
class __PushNotification:
-
 
152
    def __init__(self, notification_campaign_id, user_id, message, type, sms_type, sms_id, android_id, pushed_by, sent_timestamp, receive_timestamp, open_timestamp, sms_timestamp, gcm_id, created_timestamp, gcm_expired, notification_accounted):
-
 
153
        self.notification_campaign_id = notification_campaign_id
-
 
154
        self.user_id = user_id
-
 
155
        self.message = message
-
 
156
        self.type = type
-
 
157
        self.sms_type = sms_type
-
 
158
        self.sms_id = sms_id
-
 
159
        self.android_id = android_id
-
 
160
        self.pushed_by = pushed_by
-
 
161
        self.sent_timestamp = sent_timestamp
-
 
162
        self.receive_timestamp = receive_timestamp
-
 
163
        self.open_timestamp = open_timestamp
-
 
164
        self.sms_timestamp = sms_timestamp
-
 
165
        self.gcm_id = gcm_id
-
 
166
        self.created_timestamp = created_timestamp
-
 
167
        self.gcm_expired = gcm_expired
-
 
168
        self.notification_accounted = notification_accounted
-
 
169
 
-
 
170
def getPendingCampaigns():
-
 
171
    campaigns = list(get_mongo_connection(host=mongoHost).User.notificationcampaigns.find({'notification_processed':0,'expiresat':{'$gte':to_java_date(datetime.now())}}))
-
 
172
    return campaigns
-
 
173
 
-
 
174
def populateCampaignsMap(pendingCampaigns):
-
 
175
    global campaignUsersMap
-
 
176
    global notificationCampaignsMap
-
 
177
    for campaign in pendingCampaigns:
-
 
178
        cursor.execute(str(campaign['sql']))
-
 
179
        user_records = cursor.fetchall()
-
 
180
        userids = []
-
 
181
        for record in user_records:
-
 
182
            if str(record[0]) not in userids:
-
 
183
                userids.append(str(record[0]))
-
 
184
        campaignUsersMap[long(campaign['_id'])]=userids
-
 
185
        notificationCampaignsMap[long(campaign['_id'])] = campaign
-
 
186
        
-
 
187
def insertPushNotificationEntriesToSent():
-
 
188
    global userGcmRegIdMap
-
 
189
    for campaignId, userList in campaignUsersMap.items():
-
 
190
        logging.debug("CampaignId:- "+str(campaignId))
-
 
191
        if len(userList)==0:
-
 
192
            continue
-
 
193
        logging.debug("GCM_REG_SQL_1:- "+GCM_REG_ID_SQL1%(",".join(map(str,userList))))
-
 
194
        cursor.execute(GCM_REG_ID_SQL1%(",".join(map(str,userList))))
-
 
195
        result_data = cursor.fetchall()
-
 
196
        
-
 
197
        if result_data and len(result_data)>0:
-
 
198
            user_list = []
-
 
199
            for userId in userList:
-
 
200
                user_list.append(userId)
-
 
201
                
-
 
202
            for dataRec in result_data:
-
 
203
                '''
-
 
204
                if str(dataRec[0]) in user_list:
-
 
205
                    user_list.remove(str(dataRec[0]))
-
 
206
                '''
-
 
207
                if userGcmRegIdMap.has_key(dataRec[0]):
-
 
208
                    detailMap = {}
-
 
209
                    gcmRegIdMap = userGcmRegIdMap.get(dataRec[0])
-
 
210
                    detailMap['gcm_regid'] = dataRec[1]
-
 
211
                    if dataRec[3] is not None:
-
 
212
                        detailMap['android_id'] = dataRec[3]
-
 
213
                    gcmRegIdMap[dataRec[2]]= detailMap
-
 
214
                    userGcmRegIdMap[dataRec[0]] = gcmRegIdMap
-
 
215
                else:
-
 
216
                    gcmRegIdMap = {}
-
 
217
                    detailMap = {}
-
 
218
                    detailMap['gcm_regid'] = dataRec[1]
-
 
219
                    if dataRec[3] is not None:
-
 
220
                        detailMap['android_id'] = dataRec[3]
-
 
221
                    gcmRegIdMap[dataRec[2]]= detailMap
-
 
222
                    userGcmRegIdMap[dataRec[0]] = gcmRegIdMap
-
 
223
            
-
 
224
            logging.debug("Old Users.."+str(user_list))
-
 
225
            logging.debug("GCM_REG_SQL_2:- "+GCM_REG_ID_SQL2%(",".join(map(str,user_list))))       
-
 
226
            cursor.execute(GCM_REG_ID_SQL2%(",".join(map(str,user_list))))
-
 
227
            result_data = cursor.fetchall()
-
 
228
            for dataRec in result_data:
-
 
229
                if userGcmRegIdMap.has_key(dataRec[0]):
-
 
230
                    detailMap = {}
-
 
231
                    gcmRegIdMap = userGcmRegIdMap.get(dataRec[0])
-
 
232
                    detailMap['gcm_regid'] = dataRec[1]
-
 
233
                    if dataRec[3] is not None:
-
 
234
                        detailMap['android_id'] = dataRec[3]
-
 
235
                    gcmRegIdMap[dataRec[2]]= detailMap
-
 
236
                    userGcmRegIdMap[dataRec[0]] = gcmRegIdMap
-
 
237
                else:
-
 
238
                    gcmRegIdMap = {}
-
 
239
                    detailMap = {}
-
 
240
                    detailMap['gcm_regid'] = dataRec[1]
-
 
241
                    if dataRec[3] is not None:
-
 
242
                        detailMap['android_id'] = dataRec[3]
-
 
243
                    gcmRegIdMap[dataRec[2]]= detailMap
-
 
244
                    userGcmRegIdMap[dataRec[0]] = gcmRegIdMap
-
 
245
        else:
-
 
246
            logging.debug("GCM_REG_SQL_2:- "+GCM_REG_ID_SQL2%(",".join(map(str,userList))))
-
 
247
            cursor.execute(GCM_REG_ID_SQL2%(",".join(map(str,userList))))   
-
 
248
            result_data = cursor.fetchall()
-
 
249
            for dataRec in result_data:
-
 
250
                if userGcmRegIdMap.has_key(dataRec[0]):
-
 
251
                    detailMap = {}
-
 
252
                    gcmRegIdMap = userGcmRegIdMap.get(dataRec[0])
-
 
253
                    detailMap['gcm_regid'] = dataRec[1]
-
 
254
                    if dataRec[3] is not None:
-
 
255
                        detailMap['android_id'] = dataRec[3]
-
 
256
                    gcmRegIdMap[dataRec[2]]= detailMap
-
 
257
                    userGcmRegIdMap[dataRec[0]] = gcmRegIdMap
-
 
258
                else:
-
 
259
                    gcmRegIdMap = {}
-
 
260
                    detailMap = {}
-
 
261
                    detailMap['gcm_regid'] = dataRec[1]
-
 
262
                    if dataRec[3] is not None:
-
 
263
                        detailMap['android_id'] = dataRec[3]
-
 
264
                    gcmRegIdMap[dataRec[2]]= detailMap
-
 
265
                    userGcmRegIdMap[dataRec[0]] = gcmRegIdMap   
-
 
266
                    
-
 
267
    logging.debug("CampaignUsersMap"+str(campaignUsersMap)) 
-
 
268
        
-
 
269
    for campaignId, userList in campaignUsersMap.items():
-
 
270
        for userId in userList:
-
 
271
            gcmRegIdMap = userGcmRegIdMap.get(long(userId))
-
 
272
            if gcmRegIdMap is None:
-
 
273
                gcmRegIdMap = {}
-
 
274
            for gcm_id, details in gcmRegIdMap.items():
-
 
275
                android_id = None
-
 
276
                logging.debug("User Id:- "+str(userId)+" ..User Details:- "+str(details))
-
 
277
                if details.has_key('android_id'):
-
 
278
                    android_id = details['android_id']
-
 
279
                pushNotificationObj = __PushNotification(long(campaignId), long(userId), None, 'pending', \
-
 
280
                                    None, None, android_id, "php", None, None, None, None, gcm_id, to_java_date(datetime.now()),0,0)
-
 
281
                get_mongo_connection(host=mongoHost).User.pushnotifications.insert(pushNotificationObj.__dict__)
-
 
282
        
-
 
283
def populatePendingNotificationEntriesToBeSent():
-
 
284
    global pendingNotificationEntryMap
-
 
285
    pendingNotificationEntries = list(get_mongo_connection(host=mongoHost).User.pushnotifications.find({'type':'pending'}))
-
 
286
    for entry in pendingNotificationEntries:
-
 
287
        if pendingNotificationEntryMap.has_key(entry['notification_campaign_id']):
-
 
288
            entries = pendingNotificationEntryMap.get(entry['notification_campaign_id'])
-
 
289
            entries.append(entry)
-
 
290
            pendingNotificationEntryMap[entry['notification_campaign_id']] = entries
-
 
291
        else:
-
 
292
            entries = []
-
 
293
            entries.append(entry)
-
 
294
            pendingNotificationEntryMap[entry['notification_campaign_id']] = entries
-
 
295
    
-
 
296
def initiateNotificationThreadProcess(chunkSize):
-
 
297
    count = 1
-
 
298
    logging.debug('Starting Push Notification Job....'+str(datetime.now()))
-
 
299
    for entries in pendingNotificationEntryMap.values():
-
 
300
        campaign_receivers_list = list(chunks(entries, chunkSize))
-
 
301
        print len(campaign_receivers_list)
-
 
302
        for sublist in campaign_receivers_list:
-
 
303
            thread = NotificationThread(count, "Thread-"+str(count), sublist)
-
 
304
            thread.start()
-
 
305
            count = count +1
-
 
306
    logging.debug('Stopping Push Notification Job....'+str(datetime.now()))
-
 
307
    
-
 
308
def markNotificationCampaignsProcessed():
314
def markNotificationCampaignsProcessed():
309
    for campaign in notificationCampaignsMap.values():
315
    for campaign in notificationCampaignsMap.values():
310
        logging.debug('Notification Campaign....'+str(campaign.get('_id'))+"...Marked Processed. "+str(datetime.now()))
316
        logging.debug('Notification Campaign....'+str(campaign.get('_id'))+"...Marked Processed. "+str(datetime.now()))
311
        get_mongo_connection(host=mongoHost).User.notificationcampaigns.update({'_id':campaign.get('_id')},{"$set":{'notification_processed':1}})     
317
        get_mongo_connection(host=options.mongoHost).User.notificationcampaigns.update({'_id':campaign.get('_id')},{"$set":{'notification_processed':1}})
-
 
318
 
-
 
319
def updateFailedGcm():
-
 
320
    updateGcmUserSql = "update gcm_users set failurecount=failurecount+1 where id in (%s)"
-
 
321
    logging.debug("updateGcmUserSql:- "+updateGcmUserSql%(",".join(map(str,failedGcm))))
-
 
322
    if not failedGcm:
-
 
323
        return
-
 
324
    cursor.execute(updateGcmUserSql%(",".join(map(str,failedGcm))))
312
    
325
    
-
 
326
    try:
-
 
327
        db.commit()
-
 
328
    except:
-
 
329
        traceback.print_exc()
-
 
330
        db.rollback()
-
 
331
 
-
 
332
 
313
def main():
333
def main():
314
    global mongoHost
334
    try:
315
    parser = optparse.OptionParser()
-
 
316
    parser.add_option("-C", "--chunksize", dest="chunksize",
-
 
317
                      default="100",
-
 
318
                      type="int", help="The requsets a single thread handles",
-
 
319
                      metavar="CHUNKSIZE")
-
 
320
    parser.add_option("-M", "--mongo_host", dest="mongo_host",
-
 
321
                      default="localhost",
-
 
322
                      type="str", help="The requsets a single thread handles",
-
 
323
                      metavar="MONGOHOST")
-
 
324
    (options, args) = parser.parse_args()
-
 
325
    mongoHost = options.mongo_host
-
 
326
    pendingCampaigns = getPendingCampaigns()
335
        pendingCampaigns = getPendingCampaigns()
327
    populateCampaignsMap(pendingCampaigns)
336
        populateCampaignsMap(pendingCampaigns)
328
    insertPushNotificationEntriesToSent()
337
        insertPushNotificationEntriesToSent()
-
 
338
        markNotificationCampaignsProcessed()
329
    populatePendingNotificationEntriesToBeSent()
339
        populatePendingNotificationEntriesToBeSent()
330
    initiateNotificationThreadProcess(options.chunksize)
340
        initiateNotificationThreadProcess(options.chunksize)
-
 
341
        updateFailedGcm()
-
 
342
    except:
331
    markNotificationCampaignsProcessed()
343
        traceback.print_exc()
332
    
344
    finally:
333
    db.close()
345
        db.close()
-
 
346
 
334
 
347
 
335
if __name__=='__main__':
348
if __name__ == '__main__':
336
    main()
349
    main()
337
    
350
    
338
351