| 30 |
ashish |
1 |
%%
|
|
|
2 |
%% Licensed to the Apache Software Foundation (ASF) under one
|
|
|
3 |
%% or more contributor license agreements. See the NOTICE file
|
|
|
4 |
%% distributed with this work for additional information
|
|
|
5 |
%% regarding copyright ownership. The ASF licenses this file
|
|
|
6 |
%% to you under the Apache License, Version 2.0 (the
|
|
|
7 |
%% "License"); you may not use this file except in compliance
|
|
|
8 |
%% with the License. You may obtain a copy of the License at
|
|
|
9 |
%%
|
|
|
10 |
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
|
11 |
%%
|
|
|
12 |
%% Unless required by applicable law or agreed to in writing,
|
|
|
13 |
%% software distributed under the License is distributed on an
|
|
|
14 |
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
|
15 |
%% KIND, either express or implied. See the License for the
|
|
|
16 |
%% specific language governing permissions and limitations
|
|
|
17 |
%% under the License.
|
|
|
18 |
%%
|
|
|
19 |
|
|
|
20 |
-module(thrift_processor).
|
|
|
21 |
|
|
|
22 |
-export([init/1]).
|
|
|
23 |
|
|
|
24 |
-include("thrift_constants.hrl").
|
|
|
25 |
-include("thrift_protocol.hrl").
|
|
|
26 |
|
|
|
27 |
-record(thrift_processor, {handler, in_protocol, out_protocol, service}).
|
|
|
28 |
|
|
|
29 |
init({Server, ProtoGen, Service, Handler}) when is_function(ProtoGen, 0) ->
|
|
|
30 |
{ok, IProt, OProt} = ProtoGen(),
|
|
|
31 |
loop(#thrift_processor{in_protocol = IProt,
|
|
|
32 |
out_protocol = OProt,
|
|
|
33 |
service = Service,
|
|
|
34 |
handler = Handler}).
|
|
|
35 |
|
|
|
36 |
loop(State = #thrift_processor{in_protocol = IProto,
|
|
|
37 |
out_protocol = OProto}) ->
|
|
|
38 |
case thrift_protocol:read(IProto, message_begin) of
|
|
|
39 |
#protocol_message_begin{name = Function,
|
|
|
40 |
type = ?tMessageType_CALL} ->
|
|
|
41 |
ok = handle_function(State, list_to_atom(Function)),
|
|
|
42 |
loop(State);
|
|
|
43 |
#protocol_message_begin{name = Function,
|
|
|
44 |
type = ?tMessageType_ONEWAY} ->
|
|
|
45 |
ok = handle_function(State, list_to_atom(Function)),
|
|
|
46 |
loop(State);
|
|
|
47 |
{error, timeout} ->
|
|
|
48 |
thrift_protocol:close_transport(OProto),
|
|
|
49 |
ok;
|
|
|
50 |
{error, closed} ->
|
|
|
51 |
%% error_logger:info_msg("Client disconnected~n"),
|
|
|
52 |
thrift_protocol:close_transport(OProto),
|
|
|
53 |
exit(shutdown)
|
|
|
54 |
end.
|
|
|
55 |
|
|
|
56 |
handle_function(State=#thrift_processor{in_protocol = IProto,
|
|
|
57 |
out_protocol = OProto,
|
|
|
58 |
handler = Handler,
|
|
|
59 |
service = Service},
|
|
|
60 |
Function) ->
|
|
|
61 |
InParams = Service:function_info(Function, params_type),
|
|
|
62 |
|
|
|
63 |
{ok, Params} = thrift_protocol:read(IProto, InParams),
|
|
|
64 |
|
|
|
65 |
try
|
|
|
66 |
Result = Handler:handle_function(Function, Params),
|
|
|
67 |
%% {Micro, Result} = better_timer(Handler, handle_function, [Function, Params]),
|
|
|
68 |
%% error_logger:info_msg("Processed ~p(~p) in ~.4fms~n",
|
|
|
69 |
%% [Function, Params, Micro/1000.0]),
|
|
|
70 |
handle_success(State, Function, Result)
|
|
|
71 |
catch
|
|
|
72 |
Type:Data ->
|
|
|
73 |
handle_function_catch(State, Function, Type, Data)
|
|
|
74 |
end,
|
|
|
75 |
after_reply(OProto).
|
|
|
76 |
|
|
|
77 |
handle_function_catch(State = #thrift_processor{service = Service},
|
|
|
78 |
Function, ErrType, ErrData) ->
|
|
|
79 |
IsOneway = Service:function_info(Function, reply_type) =:= oneway_void,
|
|
|
80 |
|
|
|
81 |
case {ErrType, ErrData} of
|
|
|
82 |
_ when IsOneway ->
|
|
|
83 |
Stack = erlang:get_stacktrace(),
|
|
|
84 |
error_logger:warning_msg(
|
|
|
85 |
"oneway void ~p threw error which must be ignored: ~p",
|
|
|
86 |
[Function, {ErrType, ErrData, Stack}]),
|
|
|
87 |
ok;
|
|
|
88 |
|
|
|
89 |
{throw, Exception} when is_tuple(Exception), size(Exception) > 0 ->
|
|
|
90 |
error_logger:warning_msg("~p threw exception: ~p~n", [Function, Exception]),
|
|
|
91 |
handle_exception(State, Function, Exception),
|
|
|
92 |
ok; % we still want to accept more requests from this client
|
|
|
93 |
|
|
|
94 |
{error, Error} ->
|
|
|
95 |
ok = handle_error(State, Function, Error)
|
|
|
96 |
end.
|
|
|
97 |
|
|
|
98 |
handle_success(State = #thrift_processor{out_protocol = OProto,
|
|
|
99 |
service = Service},
|
|
|
100 |
Function,
|
|
|
101 |
Result) ->
|
|
|
102 |
ReplyType = Service:function_info(Function, reply_type),
|
|
|
103 |
StructName = atom_to_list(Function) ++ "_result",
|
|
|
104 |
|
|
|
105 |
ok = case Result of
|
|
|
106 |
{reply, ReplyData} ->
|
|
|
107 |
Reply = {{struct, [{0, ReplyType}]}, {StructName, ReplyData}},
|
|
|
108 |
send_reply(OProto, Function, ?tMessageType_REPLY, Reply);
|
|
|
109 |
|
|
|
110 |
ok when ReplyType == {struct, []} ->
|
|
|
111 |
send_reply(OProto, Function, ?tMessageType_REPLY, {ReplyType, {StructName}});
|
|
|
112 |
|
|
|
113 |
ok when ReplyType == oneway_void ->
|
|
|
114 |
%% no reply for oneway void
|
|
|
115 |
ok
|
|
|
116 |
end.
|
|
|
117 |
|
|
|
118 |
handle_exception(State = #thrift_processor{out_protocol = OProto,
|
|
|
119 |
service = Service},
|
|
|
120 |
Function,
|
|
|
121 |
Exception) ->
|
|
|
122 |
ExceptionType = element(1, Exception),
|
|
|
123 |
%% Fetch a structure like {struct, [{-2, {struct, {Module, Type}}},
|
|
|
124 |
%% {-3, {struct, {Module, Type}}}]}
|
|
|
125 |
|
|
|
126 |
ReplySpec = Service:function_info(Function, exceptions),
|
|
|
127 |
{struct, XInfo} = ReplySpec,
|
|
|
128 |
|
|
|
129 |
true = is_list(XInfo),
|
|
|
130 |
|
|
|
131 |
%% Assuming we had a type1 exception, we'd get: [undefined, Exception, undefined]
|
|
|
132 |
%% e.g.: [{-1, type0}, {-2, type1}, {-3, type2}]
|
|
|
133 |
ExceptionList = [case Type of
|
|
|
134 |
ExceptionType -> Exception;
|
|
|
135 |
_ -> undefined
|
|
|
136 |
end
|
|
|
137 |
|| {_Fid, {struct, {_Module, Type}}} <- XInfo],
|
|
|
138 |
|
|
|
139 |
ExceptionTuple = list_to_tuple([Function | ExceptionList]),
|
|
|
140 |
|
|
|
141 |
% Make sure we got at least one defined
|
|
|
142 |
case lists:all(fun(X) -> X =:= undefined end, ExceptionList) of
|
|
|
143 |
true ->
|
|
|
144 |
ok = handle_unknown_exception(State, Function, Exception);
|
|
|
145 |
false ->
|
|
|
146 |
ok = send_reply(OProto, Function, ?tMessageType_REPLY, {ReplySpec, ExceptionTuple})
|
|
|
147 |
end.
|
|
|
148 |
|
|
|
149 |
%%
|
|
|
150 |
%% Called when an exception has been explicitly thrown by the service, but it was
|
|
|
151 |
%% not one of the exceptions that was defined for the function.
|
|
|
152 |
%%
|
|
|
153 |
handle_unknown_exception(State, Function, Exception) ->
|
|
|
154 |
handle_error(State, Function, {exception_not_declared_as_thrown,
|
|
|
155 |
Exception}).
|
|
|
156 |
|
|
|
157 |
handle_error(#thrift_processor{out_protocol = OProto}, Function, Error) ->
|
|
|
158 |
Stack = erlang:get_stacktrace(),
|
|
|
159 |
error_logger:error_msg("~p had an error: ~p~n", [Function, {Error, Stack}]),
|
|
|
160 |
|
|
|
161 |
Message =
|
|
|
162 |
case application:get_env(thrift, exceptions_include_traces) of
|
|
|
163 |
{ok, true} ->
|
|
|
164 |
lists:flatten(io_lib:format("An error occurred: ~p~n",
|
|
|
165 |
[{Error, Stack}]));
|
|
|
166 |
_ ->
|
|
|
167 |
"An unknown handler error occurred."
|
|
|
168 |
end,
|
|
|
169 |
Reply = {?TApplicationException_Structure,
|
|
|
170 |
#'TApplicationException'{
|
|
|
171 |
message = Message,
|
|
|
172 |
type = ?TApplicationException_UNKNOWN}},
|
|
|
173 |
send_reply(OProto, Function, ?tMessageType_EXCEPTION, Reply).
|
|
|
174 |
|
|
|
175 |
send_reply(OProto, Function, ReplyMessageType, Reply) ->
|
|
|
176 |
ok = thrift_protocol:write(OProto, #protocol_message_begin{
|
|
|
177 |
name = atom_to_list(Function),
|
|
|
178 |
type = ReplyMessageType,
|
|
|
179 |
seqid = 0}),
|
|
|
180 |
ok = thrift_protocol:write(OProto, Reply),
|
|
|
181 |
ok = thrift_protocol:write(OProto, message_end),
|
|
|
182 |
ok = thrift_protocol:flush_transport(OProto),
|
|
|
183 |
ok.
|
|
|
184 |
|
|
|
185 |
after_reply(OProto) ->
|
|
|
186 |
ok = thrift_protocol:flush_transport(OProto)
|
|
|
187 |
%% ok = thrift_protocol:close_transport(OProto)
|
|
|
188 |
.
|