Subversion Repositories SmartDukaan

Rev

Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
21478 rajender 1
/*
2
 * Copyright (C) 2011 The Android Open Source Project
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *      http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
 
17
package com.android.volley;
18
 
19
import android.os.Handler;
20
import android.os.Looper;
21
 
22
import java.util.ArrayList;
23
import java.util.HashMap;
24
import java.util.HashSet;
25
import java.util.LinkedList;
26
import java.util.List;
27
import java.util.Map;
28
import java.util.Queue;
29
import java.util.Set;
30
import java.util.concurrent.PriorityBlockingQueue;
31
import java.util.concurrent.atomic.AtomicInteger;
32
 
33
/**
34
 * A request dispatch queue with a thread pool of dispatchers.
35
 *
36
 * Calling {@link #add(Request)} will enqueue the given Request for dispatch,
37
 * resolving from either cache or network on a worker thread, and then delivering
38
 * a parsed response on the main thread.
39
 */
40
public class RequestQueue {
41
 
42
    /** Callback interface for completed requests. */
43
    public interface RequestFinishedListener<T> {
44
        /** Called when a request has finished processing. */
45
        void onRequestFinished(Request<T> request);
46
    }
47
 
48
    /** Used for generating monotonically-increasing sequence numbers for requests. */
49
    private AtomicInteger mSequenceGenerator = new AtomicInteger();
50
 
51
    /**
52
     * Staging area for requests that already have a duplicate request in flight.
53
     *
54
     * <ul>
55
     *     <li>containsKey(cacheKey) indicates that there is a request in flight for the given cache
56
     *          key.</li>
57
     *     <li>get(cacheKey) returns waiting requests for the given cache key. The in flight request
58
     *          is <em>not</em> contained in that list. Is null if no requests are staged.</li>
59
     * </ul>
60
     */
61
    private final Map<String, Queue<Request<?>>> mWaitingRequests =
62
            new HashMap<String, Queue<Request<?>>>();
63
 
64
    /**
65
     * The set of all requests currently being processed by this RequestQueue. A Request
66
     * will be in this set if it is waiting in any queue or currently being processed by
67
     * any dispatcher.
68
     */
69
    private final Set<Request<?>> mCurrentRequests = new HashSet<Request<?>>();
70
 
71
    /** The cache triage queue. */
72
    private final PriorityBlockingQueue<Request<?>> mCacheQueue =
73
        new PriorityBlockingQueue<Request<?>>();
74
 
75
    /** The queue of requests that are actually going out to the network. */
76
    private final PriorityBlockingQueue<Request<?>> mNetworkQueue =
77
        new PriorityBlockingQueue<Request<?>>();
78
 
79
    /** Number of network request dispatcher threads to start. */
80
    private static final int DEFAULT_NETWORK_THREAD_POOL_SIZE = 4;
81
 
82
    /** Cache interface for retrieving and storing responses. */
83
    private final Cache mCache;
84
 
85
    /** Network interface for performing requests. */
86
    private final Network mNetwork;
87
 
88
    /** Response delivery mechanism. */
89
    private final ResponseDelivery mDelivery;
90
 
91
    /** The network dispatchers. */
92
    private NetworkDispatcher[] mDispatchers;
93
 
94
    /** The cache dispatcher. */
95
    private CacheDispatcher mCacheDispatcher;
96
 
97
    private List<RequestFinishedListener> mFinishedListeners =
98
            new ArrayList<RequestFinishedListener>();
99
 
100
    /**
101
     * Creates the worker pool. Processing will not begin until {@link #start()} is called.
102
     *
103
     * @param cache A Cache to use for persisting responses to disk
104
     * @param network A Network interface for performing HTTP requests
105
     * @param threadPoolSize Number of network dispatcher threads to create
106
     * @param delivery A ResponseDelivery interface for posting responses and errors
107
     */
108
    public RequestQueue(Cache cache, Network network, int threadPoolSize,
109
            ResponseDelivery delivery) {
110
        mCache = cache;
111
        mNetwork = network;
112
        mDispatchers = new NetworkDispatcher[threadPoolSize];
113
        mDelivery = delivery;
114
    }
115
 
116
    /**
117
     * Creates the worker pool. Processing will not begin until {@link #start()} is called.
118
     *
119
     * @param cache A Cache to use for persisting responses to disk
120
     * @param network A Network interface for performing HTTP requests
121
     * @param threadPoolSize Number of network dispatcher threads to create
122
     */
123
    public RequestQueue(Cache cache, Network network, int threadPoolSize) {
124
        this(cache, network, threadPoolSize,
125
                new ExecutorDelivery(new Handler(Looper.getMainLooper())));
126
    }
127
 
128
    /**
129
     * Creates the worker pool. Processing will not begin until {@link #start()} is called.
130
     *
131
     * @param cache A Cache to use for persisting responses to disk
132
     * @param network A Network interface for performing HTTP requests
133
     */
134
    public RequestQueue(Cache cache, Network network) {
135
        this(cache, network, DEFAULT_NETWORK_THREAD_POOL_SIZE);
136
    }
137
 
138
    /**
139
     * Starts the dispatchers in this queue.
140
     */
141
    public void start() {
142
        stop();  // Make sure any currently running dispatchers are stopped.
143
        // Create the cache dispatcher and start it.
144
        mCacheDispatcher = new CacheDispatcher(mCacheQueue, mNetworkQueue, mCache, mDelivery);
145
        mCacheDispatcher.start();
146
 
147
        // Create network dispatchers (and corresponding threads) up to the pool size.
148
        for (int i = 0; i < mDispatchers.length; i++) {
149
            NetworkDispatcher networkDispatcher = new NetworkDispatcher(mNetworkQueue, mNetwork,
150
                    mCache, mDelivery);
151
            mDispatchers[i] = networkDispatcher;
152
            networkDispatcher.start();
153
        }
154
    }
155
 
156
    /**
157
     * Stops the cache and network dispatchers.
158
     */
159
    public void stop() {
160
        if (mCacheDispatcher != null) {
161
            mCacheDispatcher.quit();
162
        }
163
        for (int i = 0; i < mDispatchers.length; i++) {
164
            if (mDispatchers[i] != null) {
165
                mDispatchers[i].quit();
166
            }
167
        }
168
    }
169
 
170
    /**
171
     * Gets a sequence number.
172
     */
173
    public int getSequenceNumber() {
174
        return mSequenceGenerator.incrementAndGet();
175
    }
176
 
177
    /**
178
     * Gets the {@link Cache} instance being used.
179
     */
180
    public Cache getCache() {
181
        return mCache;
182
    }
183
 
184
    /**
185
     * A simple predicate or filter interface for Requests, for use by
186
     * {@link RequestQueue#cancelAll(RequestFilter)}.
187
     */
188
    public interface RequestFilter {
189
        boolean apply(Request<?> request);
190
    }
191
 
192
    /**
193
     * Cancels all requests in this queue for which the given filter applies.
194
     * @param filter The filtering function to use
195
     */
196
    public void cancelAll(RequestFilter filter) {
197
        synchronized (mCurrentRequests) {
198
            for (Request<?> request : mCurrentRequests) {
199
                if (filter.apply(request)) {
200
                    request.cancel();
201
                }
202
            }
203
        }
204
    }
205
 
206
    /**
207
     * Cancels all requests in this queue with the given tag. Tag must be non-null
208
     * and equality is by identity.
209
     */
210
    public void cancelAll(final Object tag) {
211
        if (tag == null) {
212
            throw new IllegalArgumentException("Cannot cancelAll with a null tag");
213
        }
214
        cancelAll(new RequestFilter() {
215
            @Override
216
            public boolean apply(Request<?> request) {
217
                return request.getTag() == tag;
218
            }
219
        });
220
    }
221
 
222
    /**
223
     * Adds a Request to the dispatch queue.
224
     * @param request The request to service
225
     * @return The passed-in request
226
     */
227
    public <T> Request<T> add(Request<T> request) {
228
        // Tag the request as belonging to this queue and add it to the set of current requests.
229
        request.setRequestQueue(this);
230
        synchronized (mCurrentRequests) {
231
            mCurrentRequests.add(request);
232
        }
233
 
234
        // Process requests in the order they are added.
235
        request.setSequence(getSequenceNumber());
236
        request.addMarker("add-to-queue");
237
 
238
        // If the request is uncacheable, skip the cache queue and go straight to the network.
239
        if (!request.shouldCache()) {
240
            mNetworkQueue.add(request);
241
            return request;
242
        }
243
 
244
        // Insert request into stage if there's already a request with the same cache key in flight.
245
        synchronized (mWaitingRequests) {
246
            String cacheKey = request.getCacheKey();
247
            if (mWaitingRequests.containsKey(cacheKey)) {
248
                // There is already a request in flight. Queue up.
249
                Queue<Request<?>> stagedRequests = mWaitingRequests.get(cacheKey);
250
                if (stagedRequests == null) {
251
                    stagedRequests = new LinkedList<Request<?>>();
252
                }
253
                stagedRequests.add(request);
254
                mWaitingRequests.put(cacheKey, stagedRequests);
255
                if (VolleyLog.DEBUG) {
256
                    VolleyLog.v("Request for cacheKey=%s is in flight, putting on hold.", cacheKey);
257
                }
258
            } else {
259
                // Insert 'null' queue for this cacheKey, indicating there is now a request in
260
                // flight.
261
                mWaitingRequests.put(cacheKey, null);
262
                mCacheQueue.add(request);
263
            }
264
            return request;
265
        }
266
    }
267
 
268
    /**
269
     * Called from {@link Request#finish(String)}, indicating that processing of the given request
270
     * has finished.
271
     *
272
     * <p>Releases waiting requests for <code>request.getCacheKey()</code> if
273
     *      <code>request.shouldCache()</code>.</p>
274
     */
275
    <T> void finish(Request<T> request) {
276
        // Remove from the set of requests currently being processed.
277
        synchronized (mCurrentRequests) {
278
            mCurrentRequests.remove(request);
279
        }
280
        synchronized (mFinishedListeners) {
281
          for (RequestFinishedListener<T> listener : mFinishedListeners) {
282
            listener.onRequestFinished(request);
283
          }
284
        }
285
 
286
        if (request.shouldCache()) {
287
            synchronized (mWaitingRequests) {
288
                String cacheKey = request.getCacheKey();
289
                Queue<Request<?>> waitingRequests = mWaitingRequests.remove(cacheKey);
290
                if (waitingRequests != null) {
291
                    if (VolleyLog.DEBUG) {
292
                        VolleyLog.v("Releasing %d waiting requests for cacheKey=%s.",
293
                                waitingRequests.size(), cacheKey);
294
                    }
295
                    // Process all queued up requests. They won't be considered as in flight, but
296
                    // that's not a problem as the cache has been primed by 'request'.
297
                    mCacheQueue.addAll(waitingRequests);
298
                }
299
            }
300
        }
301
    }
302
 
303
    public  <T> void addRequestFinishedListener(RequestFinishedListener<T> listener) {
304
      synchronized (mFinishedListeners) {
305
        mFinishedListeners.add(listener);
306
      }
307
    }
308
 
309
    /**
310
     * Remove a RequestFinishedListener. Has no effect if listener was not previously added.
311
     */
312
    public  <T> void removeRequestFinishedListener(RequestFinishedListener<T> listener) {
313
      synchronized (mFinishedListeners) {
314
        mFinishedListeners.remove(listener);
315
      }
316
    }
317
}