Subversion Repositories SmartDukaan

Rev

Rev 30 | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
30 ashish 1
<?php
2
/*
3
 * Licensed to the Apache Software Foundation (ASF) under one
4
 * or more contributor license agreements. See the NOTICE file
5
 * distributed with this work for additional information
6
 * regarding copyright ownership. The ASF licenses this file
7
 * to you under the Apache License, Version 2.0 (the
8
 * "License"); you may not use this file except in compliance
9
 * with the License. You may obtain a copy of the License at
10
 *
11
 *   http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 * Unless required by applicable law or agreed to in writing,
14
 * software distributed under the License is distributed on an
15
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16
 * KIND, either express or implied. See the License for the
17
 * specific language governing permissions and limitations
18
 * under the License.
19
 *
20
 * @package thrift.transport
21
 */
22
 
23
 
24
/** Inherits from Socket */
25
include_once $GLOBALS['THRIFT_ROOT'].'/transport/TSocket.php';
26
 
27
/**
28
 * This library makes use of APC cache to make hosts as down in a web
29
 * environment. If you are running from the CLI or on a system without APC
30
 * installed, then these null functions will step in and act like cache
31
 * misses.
32
 */
33
if (!function_exists('apc_fetch')) {
34
  function apc_fetch($key) { return FALSE; }
35
  function apc_store($key, $var, $ttl=0) { return FALSE; }
36
}
37
 
38
/**
39
 * Sockets implementation of the TTransport interface that allows connection
40
 * to a pool of servers.
41
 *
42
 * @package thrift.transport
43
 */
44
class TSocketPool extends TSocket {
45
 
46
  /**
47
   * Remote servers. Array of associative arrays with 'host' and 'port' keys
48
   */
49
  private $servers_ = array();
50
 
51
  /**
52
   * How many times to retry each host in connect
53
   *
54
   * @var int
55
   */
56
  private $numRetries_ = 1;
57
 
58
  /**
59
   * Retry interval in seconds, how long to not try a host if it has been
60
   * marked as down.
61
   *
62
   * @var int
63
   */
64
  private $retryInterval_ = 60;
65
 
66
  /**
67
   * Max consecutive failures before marking a host down.
68
   *
69
   * @var int
70
   */
71
  private $maxConsecutiveFailures_ = 1;
72
 
73
  /**
74
   * Try hosts in order? or Randomized?
75
   *
76
   * @var bool
77
   */
78
  private $randomize_ = TRUE;
79
 
80
  /**
81
   * Always try last host, even if marked down?
82
   *
83
   * @var bool
84
   */
85
  private $alwaysTryLast_ = TRUE;
86
 
87
  /**
88
   * Socket pool constructor
89
   *
90
   * @param array  $hosts        List of remote hostnames
91
   * @param mixed  $ports        Array of remote ports, or a single common port
92
   * @param bool   $persist      Whether to use a persistent socket
93
   * @param mixed  $debugHandler Function for error logging
94
   */
95
  public function __construct($hosts=array('localhost'),
96
                              $ports=array(9090),
97
                              $persist=FALSE,
98
                              $debugHandler=null) {
99
    parent::__construct(null, 0, $persist, $debugHandler);
100
 
101
    if (!is_array($ports)) {
102
      $port = $ports;
103
      $ports = array();
104
      foreach ($hosts as $key => $val) {
105
        $ports[$key] = $port;
106
      }
107
    }
108
 
109
    foreach ($hosts as $key => $host) {
110
      $this->servers_ []= array('host' => $host,
111
                                'port' => $ports[$key]);
112
    }
113
  }
114
 
115
  /**
116
   * Add a server to the pool
117
   *
118
   * This function does not prevent you from adding a duplicate server entry.
119
   *
120
   * @param string $host hostname or IP
121
   * @param int $port port
122
   */
123
  public function addServer($host, $port) {
124
    $this->servers_[] = array('host' => $host, 'port' => $port);
125
  }
126
 
127
  /**
128
   * Sets how many time to keep retrying a host in the connect function.
129
   *
130
   * @param int $numRetries
131
   */
132
  public function setNumRetries($numRetries) {
133
    $this->numRetries_ = $numRetries;
134
  }
135
 
136
  /**
137
   * Sets how long to wait until retrying a host if it was marked down
138
   *
139
   * @param int $numRetries
140
   */
141
  public function setRetryInterval($retryInterval) {
142
    $this->retryInterval_ = $retryInterval;
143
  }
144
 
145
  /**
146
   * Sets how many time to keep retrying a host before marking it as down.
147
   *
148
   * @param int $numRetries
149
   */
150
  public function setMaxConsecutiveFailures($maxConsecutiveFailures) {
151
    $this->maxConsecutiveFailures_ = $maxConsecutiveFailures;
152
  }
153
 
154
  /**
155
   * Turns randomization in connect order on or off.
156
   *
157
   * @param bool $randomize
158
   */
159
  public function setRandomize($randomize) {
160
    $this->randomize_ = $randomize;
161
  }
162
 
163
  /**
164
   * Whether to always try the last server.
165
   *
166
   * @param bool $alwaysTryLast
167
   */
168
  public function setAlwaysTryLast($alwaysTryLast) {
169
    $this->alwaysTryLast_ = $alwaysTryLast;
170
  }
171
 
172
 
173
  /**
174
   * Connects the socket by iterating through all the servers in the pool
175
   * and trying to find one that works.
176
   */
177
  public function open() {
178
    // Check if we want order randomization
179
    if ($this->randomize_) {
180
      shuffle($this->servers_);
181
    }
182
 
183
    // Count servers to identify the "last" one
184
    $numServers = count($this->servers_);
185
 
186
    for ($i = 0; $i < $numServers; ++$i) {
187
 
188
      // This extracts the $host and $port variables
189
      extract($this->servers_[$i]);
190
 
191
      // Check APC cache for a record of this server being down
192
      $failtimeKey = 'thrift_failtime:'.$host.':'.$port.'~';
193
 
194
      // Cache miss? Assume it's OK
195
      $lastFailtime = apc_fetch($failtimeKey);
196
      if ($lastFailtime === FALSE) {
197
        $lastFailtime = 0;
198
      }
199
 
200
      $retryIntervalPassed = FALSE;
201
 
202
      // Cache hit...make sure enough the retry interval has elapsed
203
      if ($lastFailtime > 0) {
204
        $elapsed = time() - $lastFailtime;
205
        if ($elapsed > $this->retryInterval_) {
206
          $retryIntervalPassed = TRUE;
207
          if ($this->debug_) {
208
            call_user_func($this->debugHandler_,
209
                           'TSocketPool: retryInterval '.
210
                           '('.$this->retryInterval_.') '.
211
                           'has passed for host '.$host.':'.$port);
212
          }
213
        }
214
      }
215
 
216
      // Only connect if not in the middle of a fail interval, OR if this
217
      // is the LAST server we are trying, just hammer away on it
218
      $isLastServer = FALSE;
219
      if ($this->alwaysTryLast_) {
220
        $isLastServer = ($i == ($numServers - 1));
221
      }
222
 
223
      if (($lastFailtime === 0) ||
224
          ($isLastServer) ||
225
          ($lastFailtime > 0 && $retryIntervalPassed)) {
226
 
227
        // Set underlying TSocket params to this one
228
        $this->host_ = $host;
229
        $this->port_ = $port;
230
 
231
        // Try up to numRetries_ connections per server
232
        for ($attempt = 0; $attempt < $this->numRetries_; $attempt++) {
233
          try {
234
            // Use the underlying TSocket open function
235
            parent::open();
236
 
237
            // Only clear the failure counts if required to do so
238
            if ($lastFailtime > 0) {
239
              apc_store($failtimeKey, 0);
240
            }
241
 
242
            // Successful connection, return now
243
            return;
244
 
245
          } catch (TException $tx) {
246
            // Connection failed
247
          }
248
        }
249
 
250
        // Mark failure of this host in the cache
251
        $consecfailsKey = 'thrift_consecfails:'.$host.':'.$port.'~';
252
 
253
        // Ignore cache misses
254
        $consecfails = apc_fetch($consecfailsKey);
255
        if ($consecfails === FALSE) {
256
          $consecfails = 0;
257
        }
258
 
259
        // Increment by one
260
        $consecfails++;
261
 
262
        // Log and cache this failure
263
        if ($consecfails >= $this->maxConsecutiveFailures_) {
264
          if ($this->debug_) {
265
            call_user_func($this->debugHandler_,
266
                           'TSocketPool: marking '.$host.':'.$port.
267
                           ' as down for '.$this->retryInterval_.' secs '.
268
                           'after '.$consecfails.' failed attempts.');
269
          }
270
          // Store the failure time
271
          apc_store($failtimeKey, time());
272
 
273
          // Clear the count of consecutive failures
274
          apc_store($consecfailsKey, 0);
275
        } else {
276
          apc_store($consecfailsKey, $consecfails);
277
        }
278
      }
279
    }
280
 
281
    // Holy shit we failed them all. The system is totally ill!
282
    $error = 'TSocketPool: All hosts in pool are down. ';
283
    $hosts = array();
284
    foreach ($this->servers_ as $server) {
285
      $hosts []= $server['host'].':'.$server['port'];
286
    }
287
    $hostlist = implode(',', $hosts);
288
    $error .= '('.$hostlist.')';
289
    if ($this->debug_) {
290
      call_user_func($this->debugHandler_, $error);
291
    }
292
    throw new TException($error);
293
  }
294
}
295
 
296
?>