Subversion Repositories SmartDukaan

Rev

Go to most recent revision | Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
30 ashish 1
%%
2
%% Licensed to the Apache Software Foundation (ASF) under one
3
%% or more contributor license agreements. See the NOTICE file
4
%% distributed with this work for additional information
5
%% regarding copyright ownership. The ASF licenses this file
6
%% to you under the Apache License, Version 2.0 (the
7
%% "License"); you may not use this file except in compliance
8
%% with the License. You may obtain a copy of the License at
9
%%
10
%%   http://www.apache.org/licenses/LICENSE-2.0
11
%%
12
%% Unless required by applicable law or agreed to in writing,
13
%% software distributed under the License is distributed on an
14
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
%% KIND, either express or implied. See the License for the
16
%% specific language governing permissions and limitations
17
%% under the License.
18
%%
19
 
20
-module(thrift_client).
21
 
22
-behaviour(gen_server).
23
 
24
%% API
25
-export([start_link/2, start_link/3, start_link/4,
26
         start/3, start/4,
27
         call/3, send_call/3, close/1]).
28
 
29
%% gen_server callbacks
30
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
31
         terminate/2, code_change/3]).
32
 
33
 
34
-include("thrift_constants.hrl").
35
-include("thrift_protocol.hrl").
36
 
37
-record(state, {service, protocol, seqid}).
38
 
39
%%====================================================================
40
%% API
41
%%====================================================================
42
%%--------------------------------------------------------------------
43
%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
44
%% Description: Starts the server as a linked process.
45
%%--------------------------------------------------------------------
46
start_link(Host, Port, Service) when is_integer(Port), is_atom(Service) ->
47
    start_link(Host, Port, Service, []).
48
 
49
start_link(Host, Port, Service, Options) ->
50
    start(Host, Port, Service, [{monitor, link} | Options]).
51
 
52
start_link(ProtocolFactory, Service) ->
53
    start(ProtocolFactory, Service, [{monitor, link}]).
54
 
55
%%
56
%% Splits client options into protocol options and transport options
57
%%
58
%% split_options([Options...]) -> {ProtocolOptions, TransportOptions}
59
%%
60
split_options(Options) ->
61
    split_options(Options, [], [], []).
62
 
63
split_options([], ClientIn, ProtoIn, TransIn) ->
64
    {ClientIn, ProtoIn, TransIn};
65
 
66
split_options([Opt = {OptKey, _} | Rest], ClientIn, ProtoIn, TransIn)
67
  when OptKey =:= monitor ->
68
    split_options(Rest, [Opt | ClientIn], ProtoIn, TransIn);
69
 
70
split_options([Opt = {OptKey, _} | Rest], ClientIn, ProtoIn, TransIn)
71
  when OptKey =:= strict_read;
72
       OptKey =:= strict_write ->
73
    split_options(Rest, ClientIn, [Opt | ProtoIn], TransIn);
74
 
75
split_options([Opt = {OptKey, _} | Rest], ClientIn, ProtoIn, TransIn)
76
  when OptKey =:= framed;
77
       OptKey =:= connect_timeout;
78
       OptKey =:= sockopts ->
79
    split_options(Rest, ClientIn, ProtoIn, [Opt | TransIn]).
80
 
81
 
82
%%--------------------------------------------------------------------
83
%% Function: start() -> {ok,Pid} | ignore | {error,Error}
84
%% Description: Starts the server as an unlinked process.
85
%%--------------------------------------------------------------------
86
 
87
%% Backwards-compatible starter for the common-case of socket transports
88
start(Host, Port, Service, Options)
89
  when is_integer(Port), is_atom(Service), is_list(Options) ->
90
    {ClientOpts, ProtoOpts, TransOpts} = split_options(Options),
91
 
92
    {ok, TransportFactory} =
93
        thrift_socket_transport:new_transport_factory(Host, Port, TransOpts),
94
 
95
    {ok, ProtocolFactory} = thrift_binary_protocol:new_protocol_factory(
96
                              TransportFactory, ProtoOpts),
97
 
98
    start(ProtocolFactory, Service, ClientOpts).
99
 
100
 
101
%% ProtocolFactory :: fun() -> thrift_protocol()
102
start(ProtocolFactory, Service, ClientOpts)
103
  when is_function(ProtocolFactory), is_atom(Service) ->
104
    {Starter, Opts} =
105
        case lists:keysearch(monitor, 1, ClientOpts) of
106
            {value, {monitor, link}} ->
107
                {start_link, []};
108
            {value, {monitor, tether}} ->
109
                {start, [{tether, self()}]};
110
            _ ->
111
                {start, []}
112
        end,
113
 
114
    Connect =
115
        case lists:keysearch(connect, 1, ClientOpts) of
116
            {value, {connect, Choice}} ->
117
                Choice;
118
            _ ->
119
                %% By default, connect at creation-time.
120
                true
121
        end,
122
 
123
 
124
    Started = gen_server:Starter(?MODULE, [Service, Opts], []),
125
 
126
    if
127
        Connect ->
128
            case Started of
129
                {ok, Pid} ->
130
                    case gen_server:call(Pid, {connect, ProtocolFactory}) of
131
                        ok ->
132
                            {ok, Pid};
133
                        Error ->
134
                            Error
135
                    end;
136
                Else ->
137
                    Else
138
            end;
139
        true ->
140
            Started
141
    end.
142
 
143
call(Client, Function, Args)
144
  when is_pid(Client), is_atom(Function), is_list(Args) ->
145
    case gen_server:call(Client, {call, Function, Args}) of
146
        R = {ok, _} -> R;
147
        R = {error, _} -> R;
148
        {exception, Exception} -> throw(Exception)
149
    end.
150
 
151
cast(Client, Function, Args)
152
  when is_pid(Client), is_atom(Function), is_list(Args) ->
153
    gen_server:cast(Client, {call, Function, Args}).
154
 
155
%% Sends a function call but does not read the result. This is useful
156
%% if you're trying to log non-oneway function calls to write-only
157
%% transports like thrift_disk_log_transport.
158
send_call(Client, Function, Args)
159
  when is_pid(Client), is_atom(Function), is_list(Args) ->
160
    gen_server:call(Client, {send_call, Function, Args}).
161
 
162
close(Client) when is_pid(Client) ->
163
    gen_server:cast(Client, close).
164
 
165
%%====================================================================
166
%% gen_server callbacks
167
%%====================================================================
168
 
169
%%--------------------------------------------------------------------
170
%% Function: init(Args) -> {ok, State} |
171
%%                         {ok, State, Timeout} |
172
%%                         ignore               |
173
%%                         {stop, Reason}
174
%% Description: Initiates the server
175
%%--------------------------------------------------------------------
176
init([Service, Opts]) ->
177
    case lists:keysearch(tether, 1, Opts) of
178
        {value, {tether, Pid}} ->
179
            erlang:monitor(process, Pid);
180
        _Else ->
181
            ok
182
    end,
183
    {ok, #state{service = Service}}.
184
 
185
%%--------------------------------------------------------------------
186
%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
187
%%                                      {reply, Reply, State, Timeout} |
188
%%                                      {noreply, State} |
189
%%                                      {noreply, State, Timeout} |
190
%%                                      {stop, Reason, Reply, State} |
191
%%                                      {stop, Reason, State}
192
%% Description: Handling call messages
193
%%--------------------------------------------------------------------
194
handle_call({connect, ProtocolFactory}, _From,
195
            State = #state{service = Service}) ->
196
    case ProtocolFactory() of
197
        {ok, Protocol} ->
198
            {reply, ok, State#state{protocol = Protocol,
199
                                    seqid = 0}};
200
        Error ->
201
            {stop, normal, Error, State}
202
    end;
203
 
204
handle_call({call, Function, Args}, _From, State = #state{service = Service}) ->
205
    Result = catch_function_exceptions(
206
               fun() ->
207
                       ok = send_function_call(State, Function, Args),
208
                       receive_function_result(State, Function)
209
               end,
210
               Service),
211
    {reply, Result, State};
212
 
213
 
214
handle_call({send_call, Function, Args}, _From, State = #state{service = Service}) ->
215
    Result = catch_function_exceptions(
216
               fun() ->
217
                       send_function_call(State, Function, Args)
218
               end,
219
               Service),
220
    {reply, Result, State}.
221
 
222
 
223
%% Helper function that catches exceptions thrown by sending or receiving
224
%% a function and returns the correct response for call or send_only above.
225
catch_function_exceptions(Fun, Service) ->
226
    try
227
        Fun()
228
    catch
229
        throw:{return, Return} ->
230
            Return;
231
          error:function_clause ->
232
            ST = erlang:get_stacktrace(),
233
            case hd(ST) of
234
                {Service, function_info, [Function, _]} ->
235
                    {error, {no_function, Function}};
236
                _ -> throw({error, {function_clause, ST}})
237
            end
238
    end.
239
 
240
 
241
%%--------------------------------------------------------------------
242
%% Function: handle_cast(Msg, State) -> {noreply, State} |
243
%%                                      {noreply, State, Timeout} |
244
%%                                      {stop, Reason, State}
245
%% Description: Handling cast messages
246
%%--------------------------------------------------------------------
247
handle_cast({call, Function, Args}, State = #state{service = Service,
248
                                                   protocol = Protocol,
249
                                                   seqid = SeqId}) ->
250
    _Result =
251
        try
252
            ok = send_function_call(State, Function, Args),
253
            receive_function_result(State, Function)
254
        catch
255
            Class:Reason ->
256
                error_logger:error_msg("error ignored in handle_cast({cast,...},...): ~p:~p~n", [Class, Reason])
257
        end,
258
 
259
    {noreply, State};
260
 
261
handle_cast(close, State=#state{protocol = Protocol}) ->
262
%%     error_logger:info_msg("thrift_client ~p received close", [self()]),
263
    {stop,normal,State};
264
handle_cast(_Msg, State) ->
265
    {noreply, State}.
266
 
267
%%--------------------------------------------------------------------
268
%% Function: handle_info(Info, State) -> {noreply, State} |
269
%%                                       {noreply, State, Timeout} |
270
%%                                       {stop, Reason, State}
271
%% Description: Handling all non call/cast messages
272
%%--------------------------------------------------------------------
273
handle_info({'DOWN', MonitorRef, process, Pid, _Info}, State)
274
  when is_reference(MonitorRef), is_pid(Pid) ->
275
    %% We don't actually verify the correctness of the DOWN message.
276
    {stop, parent_died, State};
277
 
278
handle_info(_Info, State) ->
279
    {noreply, State}.
280
 
281
%%--------------------------------------------------------------------
282
%% Function: terminate(Reason, State) -> void()
283
%% Description: This function is called by a gen_server when it is about to
284
%% terminate. It should be the opposite of Module:init/1 and do any necessary
285
%% cleaning up. When it returns, the gen_server terminates with Reason.
286
%% The return value is ignored.
287
%%--------------------------------------------------------------------
288
terminate(Reason, State = #state{protocol=undefined}) ->
289
    ok;
290
terminate(Reason, State = #state{protocol=Protocol}) ->
291
    thrift_protocol:close_transport(Protocol),
292
    ok.
293
 
294
%%--------------------------------------------------------------------
295
%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
296
%% Description: Convert process state when code is changed
297
%%--------------------------------------------------------------------
298
code_change(_OldVsn, State, _Extra) ->
299
    {ok, State}.
300
 
301
%%--------------------------------------------------------------------
302
%%% Internal functions
303
%%--------------------------------------------------------------------
304
send_function_call(#state{protocol = Proto,
305
                          service  = Service,
306
                          seqid    = SeqId},
307
                   Function,
308
                   Args) ->
309
    Params = Service:function_info(Function, params_type),
310
    {struct, PList} = Params,
311
    if
312
        length(PList) =/= length(Args) ->
313
            throw({return, {error, {bad_args, Function, Args}}});
314
        true -> ok
315
    end,
316
 
317
    Begin = #protocol_message_begin{name = atom_to_list(Function),
318
                                    type = ?tMessageType_CALL,
319
                                    seqid = SeqId},
320
    ok = thrift_protocol:write(Proto, Begin),
321
    ok = thrift_protocol:write(Proto, {Params, list_to_tuple([Function | Args])}),
322
    ok = thrift_protocol:write(Proto, message_end),
323
    thrift_protocol:flush_transport(Proto),
324
    ok.
325
 
326
receive_function_result(State = #state{protocol = Proto,
327
                                       service = Service},
328
                        Function) ->
329
    ResultType = Service:function_info(Function, reply_type),
330
    read_result(State, Function, ResultType).
331
 
332
read_result(_State,
333
            _Function,
334
            oneway_void) ->
335
    {ok, ok};
336
 
337
read_result(State = #state{protocol = Proto,
338
                           seqid    = SeqId},
339
            Function,
340
            ReplyType) ->
341
    case thrift_protocol:read(Proto, message_begin) of
342
        #protocol_message_begin{seqid = RetSeqId} when RetSeqId =/= SeqId ->
343
            {error, {bad_seq_id, SeqId}};
344
 
345
        #protocol_message_begin{type = ?tMessageType_EXCEPTION} ->
346
            handle_application_exception(State);
347
 
348
        #protocol_message_begin{type = ?tMessageType_REPLY} ->
349
            handle_reply(State, Function, ReplyType)
350
    end.
351
 
352
handle_reply(State = #state{protocol = Proto,
353
                            service = Service},
354
             Function,
355
             ReplyType) ->
356
    {struct, ExceptionFields} = Service:function_info(Function, exceptions),
357
    ReplyStructDef = {struct, [{0, ReplyType}] ++ ExceptionFields},
358
    {ok, Reply} = thrift_protocol:read(Proto, ReplyStructDef),
359
    ReplyList = tuple_to_list(Reply),
360
    true = length(ReplyList) == length(ExceptionFields) + 1,
361
    ExceptionVals = tl(ReplyList),
362
    Thrown = [X || X <- ExceptionVals,
363
                   X =/= undefined],
364
    Result =
365
        case Thrown of
366
            [] when ReplyType == {struct, []} ->
367
                {ok, ok};
368
            [] ->
369
                {ok, hd(ReplyList)};
370
            [Exception] ->
371
                {exception, Exception}
372
        end,
373
    ok = thrift_protocol:read(Proto, message_end),
374
    Result.
375
 
376
handle_application_exception(State = #state{protocol = Proto}) ->
377
    {ok, Exception} = thrift_protocol:read(Proto,
378
                                           ?TApplicationException_Structure),
379
    ok = thrift_protocol:read(Proto, message_end),
380
    XRecord = list_to_tuple(
381
                ['TApplicationException' | tuple_to_list(Exception)]),
382
    error_logger:error_msg("X: ~p~n", [XRecord]),
383
    true = is_record(XRecord, 'TApplicationException'),
384
    {exception, XRecord}.