| 30 |
ashish |
1 |
%%
|
|
|
2 |
%% Licensed to the Apache Software Foundation (ASF) under one
|
|
|
3 |
%% or more contributor license agreements. See the NOTICE file
|
|
|
4 |
%% distributed with this work for additional information
|
|
|
5 |
%% regarding copyright ownership. The ASF licenses this file
|
|
|
6 |
%% to you under the Apache License, Version 2.0 (the
|
|
|
7 |
%% "License"); you may not use this file except in compliance
|
|
|
8 |
%% with the License. You may obtain a copy of the License at
|
|
|
9 |
%%
|
|
|
10 |
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
|
11 |
%%
|
|
|
12 |
%% Unless required by applicable law or agreed to in writing,
|
|
|
13 |
%% software distributed under the License is distributed on an
|
|
|
14 |
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
|
15 |
%% KIND, either express or implied. See the License for the
|
|
|
16 |
%% specific language governing permissions and limitations
|
|
|
17 |
%% under the License.
|
|
|
18 |
%%
|
|
|
19 |
|
|
|
20 |
-module(thrift_client).
|
|
|
21 |
|
|
|
22 |
-behaviour(gen_server).
|
|
|
23 |
|
|
|
24 |
%% API
|
|
|
25 |
-export([start_link/2, start_link/3, start_link/4,
|
|
|
26 |
start/3, start/4,
|
|
|
27 |
call/3, send_call/3, close/1]).
|
|
|
28 |
|
|
|
29 |
%% gen_server callbacks
|
|
|
30 |
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
|
|
31 |
terminate/2, code_change/3]).
|
|
|
32 |
|
|
|
33 |
|
|
|
34 |
-include("thrift_constants.hrl").
|
|
|
35 |
-include("thrift_protocol.hrl").
|
|
|
36 |
|
|
|
37 |
-record(state, {service, protocol, seqid}).
|
|
|
38 |
|
|
|
39 |
%%====================================================================
|
|
|
40 |
%% API
|
|
|
41 |
%%====================================================================
|
|
|
42 |
%%--------------------------------------------------------------------
|
|
|
43 |
%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
|
|
|
44 |
%% Description: Starts the server as a linked process.
|
|
|
45 |
%%--------------------------------------------------------------------
|
|
|
46 |
start_link(Host, Port, Service) when is_integer(Port), is_atom(Service) ->
|
|
|
47 |
start_link(Host, Port, Service, []).
|
|
|
48 |
|
|
|
49 |
start_link(Host, Port, Service, Options) ->
|
|
|
50 |
start(Host, Port, Service, [{monitor, link} | Options]).
|
|
|
51 |
|
|
|
52 |
start_link(ProtocolFactory, Service) ->
|
|
|
53 |
start(ProtocolFactory, Service, [{monitor, link}]).
|
|
|
54 |
|
|
|
55 |
%%
|
|
|
56 |
%% Splits client options into protocol options and transport options
|
|
|
57 |
%%
|
|
|
58 |
%% split_options([Options...]) -> {ProtocolOptions, TransportOptions}
|
|
|
59 |
%%
|
|
|
60 |
split_options(Options) ->
|
|
|
61 |
split_options(Options, [], [], []).
|
|
|
62 |
|
|
|
63 |
split_options([], ClientIn, ProtoIn, TransIn) ->
|
|
|
64 |
{ClientIn, ProtoIn, TransIn};
|
|
|
65 |
|
|
|
66 |
split_options([Opt = {OptKey, _} | Rest], ClientIn, ProtoIn, TransIn)
|
|
|
67 |
when OptKey =:= monitor ->
|
|
|
68 |
split_options(Rest, [Opt | ClientIn], ProtoIn, TransIn);
|
|
|
69 |
|
|
|
70 |
split_options([Opt = {OptKey, _} | Rest], ClientIn, ProtoIn, TransIn)
|
|
|
71 |
when OptKey =:= strict_read;
|
|
|
72 |
OptKey =:= strict_write ->
|
|
|
73 |
split_options(Rest, ClientIn, [Opt | ProtoIn], TransIn);
|
|
|
74 |
|
|
|
75 |
split_options([Opt = {OptKey, _} | Rest], ClientIn, ProtoIn, TransIn)
|
|
|
76 |
when OptKey =:= framed;
|
|
|
77 |
OptKey =:= connect_timeout;
|
|
|
78 |
OptKey =:= sockopts ->
|
|
|
79 |
split_options(Rest, ClientIn, ProtoIn, [Opt | TransIn]).
|
|
|
80 |
|
|
|
81 |
|
|
|
82 |
%%--------------------------------------------------------------------
|
|
|
83 |
%% Function: start() -> {ok,Pid} | ignore | {error,Error}
|
|
|
84 |
%% Description: Starts the server as an unlinked process.
|
|
|
85 |
%%--------------------------------------------------------------------
|
|
|
86 |
|
|
|
87 |
%% Backwards-compatible starter for the common-case of socket transports
|
|
|
88 |
start(Host, Port, Service, Options)
|
|
|
89 |
when is_integer(Port), is_atom(Service), is_list(Options) ->
|
|
|
90 |
{ClientOpts, ProtoOpts, TransOpts} = split_options(Options),
|
|
|
91 |
|
|
|
92 |
{ok, TransportFactory} =
|
|
|
93 |
thrift_socket_transport:new_transport_factory(Host, Port, TransOpts),
|
|
|
94 |
|
|
|
95 |
{ok, ProtocolFactory} = thrift_binary_protocol:new_protocol_factory(
|
|
|
96 |
TransportFactory, ProtoOpts),
|
|
|
97 |
|
|
|
98 |
start(ProtocolFactory, Service, ClientOpts).
|
|
|
99 |
|
|
|
100 |
|
|
|
101 |
%% ProtocolFactory :: fun() -> thrift_protocol()
|
|
|
102 |
start(ProtocolFactory, Service, ClientOpts)
|
|
|
103 |
when is_function(ProtocolFactory), is_atom(Service) ->
|
|
|
104 |
{Starter, Opts} =
|
|
|
105 |
case lists:keysearch(monitor, 1, ClientOpts) of
|
|
|
106 |
{value, {monitor, link}} ->
|
|
|
107 |
{start_link, []};
|
|
|
108 |
{value, {monitor, tether}} ->
|
|
|
109 |
{start, [{tether, self()}]};
|
|
|
110 |
_ ->
|
|
|
111 |
{start, []}
|
|
|
112 |
end,
|
|
|
113 |
|
|
|
114 |
Connect =
|
|
|
115 |
case lists:keysearch(connect, 1, ClientOpts) of
|
|
|
116 |
{value, {connect, Choice}} ->
|
|
|
117 |
Choice;
|
|
|
118 |
_ ->
|
|
|
119 |
%% By default, connect at creation-time.
|
|
|
120 |
true
|
|
|
121 |
end,
|
|
|
122 |
|
|
|
123 |
|
|
|
124 |
Started = gen_server:Starter(?MODULE, [Service, Opts], []),
|
|
|
125 |
|
|
|
126 |
if
|
|
|
127 |
Connect ->
|
|
|
128 |
case Started of
|
|
|
129 |
{ok, Pid} ->
|
|
|
130 |
case gen_server:call(Pid, {connect, ProtocolFactory}) of
|
|
|
131 |
ok ->
|
|
|
132 |
{ok, Pid};
|
|
|
133 |
Error ->
|
|
|
134 |
Error
|
|
|
135 |
end;
|
|
|
136 |
Else ->
|
|
|
137 |
Else
|
|
|
138 |
end;
|
|
|
139 |
true ->
|
|
|
140 |
Started
|
|
|
141 |
end.
|
|
|
142 |
|
|
|
143 |
call(Client, Function, Args)
|
|
|
144 |
when is_pid(Client), is_atom(Function), is_list(Args) ->
|
|
|
145 |
case gen_server:call(Client, {call, Function, Args}) of
|
|
|
146 |
R = {ok, _} -> R;
|
|
|
147 |
R = {error, _} -> R;
|
|
|
148 |
{exception, Exception} -> throw(Exception)
|
|
|
149 |
end.
|
|
|
150 |
|
|
|
151 |
cast(Client, Function, Args)
|
|
|
152 |
when is_pid(Client), is_atom(Function), is_list(Args) ->
|
|
|
153 |
gen_server:cast(Client, {call, Function, Args}).
|
|
|
154 |
|
|
|
155 |
%% Sends a function call but does not read the result. This is useful
|
|
|
156 |
%% if you're trying to log non-oneway function calls to write-only
|
|
|
157 |
%% transports like thrift_disk_log_transport.
|
|
|
158 |
send_call(Client, Function, Args)
|
|
|
159 |
when is_pid(Client), is_atom(Function), is_list(Args) ->
|
|
|
160 |
gen_server:call(Client, {send_call, Function, Args}).
|
|
|
161 |
|
|
|
162 |
close(Client) when is_pid(Client) ->
|
|
|
163 |
gen_server:cast(Client, close).
|
|
|
164 |
|
|
|
165 |
%%====================================================================
|
|
|
166 |
%% gen_server callbacks
|
|
|
167 |
%%====================================================================
|
|
|
168 |
|
|
|
169 |
%%--------------------------------------------------------------------
|
|
|
170 |
%% Function: init(Args) -> {ok, State} |
|
|
|
171 |
%% {ok, State, Timeout} |
|
|
|
172 |
%% ignore |
|
|
|
173 |
%% {stop, Reason}
|
|
|
174 |
%% Description: Initiates the server
|
|
|
175 |
%%--------------------------------------------------------------------
|
|
|
176 |
init([Service, Opts]) ->
|
|
|
177 |
case lists:keysearch(tether, 1, Opts) of
|
|
|
178 |
{value, {tether, Pid}} ->
|
|
|
179 |
erlang:monitor(process, Pid);
|
|
|
180 |
_Else ->
|
|
|
181 |
ok
|
|
|
182 |
end,
|
|
|
183 |
{ok, #state{service = Service}}.
|
|
|
184 |
|
|
|
185 |
%%--------------------------------------------------------------------
|
|
|
186 |
%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
|
|
|
187 |
%% {reply, Reply, State, Timeout} |
|
|
|
188 |
%% {noreply, State} |
|
|
|
189 |
%% {noreply, State, Timeout} |
|
|
|
190 |
%% {stop, Reason, Reply, State} |
|
|
|
191 |
%% {stop, Reason, State}
|
|
|
192 |
%% Description: Handling call messages
|
|
|
193 |
%%--------------------------------------------------------------------
|
|
|
194 |
handle_call({connect, ProtocolFactory}, _From,
|
|
|
195 |
State = #state{service = Service}) ->
|
|
|
196 |
case ProtocolFactory() of
|
|
|
197 |
{ok, Protocol} ->
|
|
|
198 |
{reply, ok, State#state{protocol = Protocol,
|
|
|
199 |
seqid = 0}};
|
|
|
200 |
Error ->
|
|
|
201 |
{stop, normal, Error, State}
|
|
|
202 |
end;
|
|
|
203 |
|
|
|
204 |
handle_call({call, Function, Args}, _From, State = #state{service = Service}) ->
|
|
|
205 |
Result = catch_function_exceptions(
|
|
|
206 |
fun() ->
|
|
|
207 |
ok = send_function_call(State, Function, Args),
|
|
|
208 |
receive_function_result(State, Function)
|
|
|
209 |
end,
|
|
|
210 |
Service),
|
|
|
211 |
{reply, Result, State};
|
|
|
212 |
|
|
|
213 |
|
|
|
214 |
handle_call({send_call, Function, Args}, _From, State = #state{service = Service}) ->
|
|
|
215 |
Result = catch_function_exceptions(
|
|
|
216 |
fun() ->
|
|
|
217 |
send_function_call(State, Function, Args)
|
|
|
218 |
end,
|
|
|
219 |
Service),
|
|
|
220 |
{reply, Result, State}.
|
|
|
221 |
|
|
|
222 |
|
|
|
223 |
%% Helper function that catches exceptions thrown by sending or receiving
|
|
|
224 |
%% a function and returns the correct response for call or send_only above.
|
|
|
225 |
catch_function_exceptions(Fun, Service) ->
|
|
|
226 |
try
|
|
|
227 |
Fun()
|
|
|
228 |
catch
|
|
|
229 |
throw:{return, Return} ->
|
|
|
230 |
Return;
|
|
|
231 |
error:function_clause ->
|
|
|
232 |
ST = erlang:get_stacktrace(),
|
|
|
233 |
case hd(ST) of
|
|
|
234 |
{Service, function_info, [Function, _]} ->
|
|
|
235 |
{error, {no_function, Function}};
|
|
|
236 |
_ -> throw({error, {function_clause, ST}})
|
|
|
237 |
end
|
|
|
238 |
end.
|
|
|
239 |
|
|
|
240 |
|
|
|
241 |
%%--------------------------------------------------------------------
|
|
|
242 |
%% Function: handle_cast(Msg, State) -> {noreply, State} |
|
|
|
243 |
%% {noreply, State, Timeout} |
|
|
|
244 |
%% {stop, Reason, State}
|
|
|
245 |
%% Description: Handling cast messages
|
|
|
246 |
%%--------------------------------------------------------------------
|
|
|
247 |
handle_cast({call, Function, Args}, State = #state{service = Service,
|
|
|
248 |
protocol = Protocol,
|
|
|
249 |
seqid = SeqId}) ->
|
|
|
250 |
_Result =
|
|
|
251 |
try
|
|
|
252 |
ok = send_function_call(State, Function, Args),
|
|
|
253 |
receive_function_result(State, Function)
|
|
|
254 |
catch
|
|
|
255 |
Class:Reason ->
|
|
|
256 |
error_logger:error_msg("error ignored in handle_cast({cast,...},...): ~p:~p~n", [Class, Reason])
|
|
|
257 |
end,
|
|
|
258 |
|
|
|
259 |
{noreply, State};
|
|
|
260 |
|
|
|
261 |
handle_cast(close, State=#state{protocol = Protocol}) ->
|
|
|
262 |
%% error_logger:info_msg("thrift_client ~p received close", [self()]),
|
|
|
263 |
{stop,normal,State};
|
|
|
264 |
handle_cast(_Msg, State) ->
|
|
|
265 |
{noreply, State}.
|
|
|
266 |
|
|
|
267 |
%%--------------------------------------------------------------------
|
|
|
268 |
%% Function: handle_info(Info, State) -> {noreply, State} |
|
|
|
269 |
%% {noreply, State, Timeout} |
|
|
|
270 |
%% {stop, Reason, State}
|
|
|
271 |
%% Description: Handling all non call/cast messages
|
|
|
272 |
%%--------------------------------------------------------------------
|
|
|
273 |
handle_info({'DOWN', MonitorRef, process, Pid, _Info}, State)
|
|
|
274 |
when is_reference(MonitorRef), is_pid(Pid) ->
|
|
|
275 |
%% We don't actually verify the correctness of the DOWN message.
|
|
|
276 |
{stop, parent_died, State};
|
|
|
277 |
|
|
|
278 |
handle_info(_Info, State) ->
|
|
|
279 |
{noreply, State}.
|
|
|
280 |
|
|
|
281 |
%%--------------------------------------------------------------------
|
|
|
282 |
%% Function: terminate(Reason, State) -> void()
|
|
|
283 |
%% Description: This function is called by a gen_server when it is about to
|
|
|
284 |
%% terminate. It should be the opposite of Module:init/1 and do any necessary
|
|
|
285 |
%% cleaning up. When it returns, the gen_server terminates with Reason.
|
|
|
286 |
%% The return value is ignored.
|
|
|
287 |
%%--------------------------------------------------------------------
|
|
|
288 |
terminate(Reason, State = #state{protocol=undefined}) ->
|
|
|
289 |
ok;
|
|
|
290 |
terminate(Reason, State = #state{protocol=Protocol}) ->
|
|
|
291 |
thrift_protocol:close_transport(Protocol),
|
|
|
292 |
ok.
|
|
|
293 |
|
|
|
294 |
%%--------------------------------------------------------------------
|
|
|
295 |
%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
|
|
|
296 |
%% Description: Convert process state when code is changed
|
|
|
297 |
%%--------------------------------------------------------------------
|
|
|
298 |
code_change(_OldVsn, State, _Extra) ->
|
|
|
299 |
{ok, State}.
|
|
|
300 |
|
|
|
301 |
%%--------------------------------------------------------------------
|
|
|
302 |
%%% Internal functions
|
|
|
303 |
%%--------------------------------------------------------------------
|
|
|
304 |
send_function_call(#state{protocol = Proto,
|
|
|
305 |
service = Service,
|
|
|
306 |
seqid = SeqId},
|
|
|
307 |
Function,
|
|
|
308 |
Args) ->
|
|
|
309 |
Params = Service:function_info(Function, params_type),
|
|
|
310 |
{struct, PList} = Params,
|
|
|
311 |
if
|
|
|
312 |
length(PList) =/= length(Args) ->
|
|
|
313 |
throw({return, {error, {bad_args, Function, Args}}});
|
|
|
314 |
true -> ok
|
|
|
315 |
end,
|
|
|
316 |
|
|
|
317 |
Begin = #protocol_message_begin{name = atom_to_list(Function),
|
|
|
318 |
type = ?tMessageType_CALL,
|
|
|
319 |
seqid = SeqId},
|
|
|
320 |
ok = thrift_protocol:write(Proto, Begin),
|
|
|
321 |
ok = thrift_protocol:write(Proto, {Params, list_to_tuple([Function | Args])}),
|
|
|
322 |
ok = thrift_protocol:write(Proto, message_end),
|
|
|
323 |
thrift_protocol:flush_transport(Proto),
|
|
|
324 |
ok.
|
|
|
325 |
|
|
|
326 |
receive_function_result(State = #state{protocol = Proto,
|
|
|
327 |
service = Service},
|
|
|
328 |
Function) ->
|
|
|
329 |
ResultType = Service:function_info(Function, reply_type),
|
|
|
330 |
read_result(State, Function, ResultType).
|
|
|
331 |
|
|
|
332 |
read_result(_State,
|
|
|
333 |
_Function,
|
|
|
334 |
oneway_void) ->
|
|
|
335 |
{ok, ok};
|
|
|
336 |
|
|
|
337 |
read_result(State = #state{protocol = Proto,
|
|
|
338 |
seqid = SeqId},
|
|
|
339 |
Function,
|
|
|
340 |
ReplyType) ->
|
|
|
341 |
case thrift_protocol:read(Proto, message_begin) of
|
|
|
342 |
#protocol_message_begin{seqid = RetSeqId} when RetSeqId =/= SeqId ->
|
|
|
343 |
{error, {bad_seq_id, SeqId}};
|
|
|
344 |
|
|
|
345 |
#protocol_message_begin{type = ?tMessageType_EXCEPTION} ->
|
|
|
346 |
handle_application_exception(State);
|
|
|
347 |
|
|
|
348 |
#protocol_message_begin{type = ?tMessageType_REPLY} ->
|
|
|
349 |
handle_reply(State, Function, ReplyType)
|
|
|
350 |
end.
|
|
|
351 |
|
|
|
352 |
handle_reply(State = #state{protocol = Proto,
|
|
|
353 |
service = Service},
|
|
|
354 |
Function,
|
|
|
355 |
ReplyType) ->
|
|
|
356 |
{struct, ExceptionFields} = Service:function_info(Function, exceptions),
|
|
|
357 |
ReplyStructDef = {struct, [{0, ReplyType}] ++ ExceptionFields},
|
|
|
358 |
{ok, Reply} = thrift_protocol:read(Proto, ReplyStructDef),
|
|
|
359 |
ReplyList = tuple_to_list(Reply),
|
|
|
360 |
true = length(ReplyList) == length(ExceptionFields) + 1,
|
|
|
361 |
ExceptionVals = tl(ReplyList),
|
|
|
362 |
Thrown = [X || X <- ExceptionVals,
|
|
|
363 |
X =/= undefined],
|
|
|
364 |
Result =
|
|
|
365 |
case Thrown of
|
|
|
366 |
[] when ReplyType == {struct, []} ->
|
|
|
367 |
{ok, ok};
|
|
|
368 |
[] ->
|
|
|
369 |
{ok, hd(ReplyList)};
|
|
|
370 |
[Exception] ->
|
|
|
371 |
{exception, Exception}
|
|
|
372 |
end,
|
|
|
373 |
ok = thrift_protocol:read(Proto, message_end),
|
|
|
374 |
Result.
|
|
|
375 |
|
|
|
376 |
handle_application_exception(State = #state{protocol = Proto}) ->
|
|
|
377 |
{ok, Exception} = thrift_protocol:read(Proto,
|
|
|
378 |
?TApplicationException_Structure),
|
|
|
379 |
ok = thrift_protocol:read(Proto, message_end),
|
|
|
380 |
XRecord = list_to_tuple(
|
|
|
381 |
['TApplicationException' | tuple_to_list(Exception)]),
|
|
|
382 |
error_logger:error_msg("X: ~p~n", [XRecord]),
|
|
|
383 |
true = is_record(XRecord, 'TApplicationException'),
|
|
|
384 |
{exception, XRecord}.
|