| 30 |
ashish |
1 |
%%
|
|
|
2 |
%% Licensed to the Apache Software Foundation (ASF) under one
|
|
|
3 |
%% or more contributor license agreements. See the NOTICE file
|
|
|
4 |
%% distributed with this work for additional information
|
|
|
5 |
%% regarding copyright ownership. The ASF licenses this file
|
|
|
6 |
%% to you under the Apache License, Version 2.0 (the
|
|
|
7 |
%% "License"); you may not use this file except in compliance
|
|
|
8 |
%% with the License. You may obtain a copy of the License at
|
|
|
9 |
%%
|
|
|
10 |
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
|
11 |
%%
|
|
|
12 |
%% Unless required by applicable law or agreed to in writing,
|
|
|
13 |
%% software distributed under the License is distributed on an
|
|
|
14 |
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
|
15 |
%% KIND, either express or implied. See the License for the
|
|
|
16 |
%% specific language governing permissions and limitations
|
|
|
17 |
%% under the License.
|
|
|
18 |
%%
|
|
|
19 |
|
|
|
20 |
-module(thrift_socket_transport).
|
|
|
21 |
|
|
|
22 |
-behaviour(thrift_transport).
|
|
|
23 |
|
|
|
24 |
-export([new/1,
|
|
|
25 |
new/2,
|
|
|
26 |
write/2, read/2, flush/1, close/1,
|
|
|
27 |
|
|
|
28 |
new_transport_factory/3]).
|
|
|
29 |
|
|
|
30 |
-record(data, {socket,
|
|
|
31 |
recv_timeout=infinity}).
|
|
|
32 |
|
|
|
33 |
new(Socket) ->
|
|
|
34 |
new(Socket, []).
|
|
|
35 |
|
|
|
36 |
new(Socket, Opts) when is_list(Opts) ->
|
|
|
37 |
State =
|
|
|
38 |
case lists:keysearch(recv_timeout, 1, Opts) of
|
|
|
39 |
{value, {recv_timeout, Timeout}}
|
|
|
40 |
when is_integer(Timeout), Timeout > 0 ->
|
|
|
41 |
#data{socket=Socket, recv_timeout=Timeout};
|
|
|
42 |
_ ->
|
|
|
43 |
#data{socket=Socket}
|
|
|
44 |
end,
|
|
|
45 |
thrift_transport:new(?MODULE, State).
|
|
|
46 |
|
|
|
47 |
%% Data :: iolist()
|
|
|
48 |
write(#data{socket = Socket}, Data) ->
|
|
|
49 |
gen_tcp:send(Socket, Data).
|
|
|
50 |
|
|
|
51 |
read(#data{socket=Socket, recv_timeout=Timeout}, Len)
|
|
|
52 |
when is_integer(Len), Len >= 0 ->
|
|
|
53 |
case gen_tcp:recv(Socket, Len, Timeout) of
|
|
|
54 |
Err = {error, timeout} ->
|
|
|
55 |
error_logger:info_msg("read timeout: peer conn ~p", [inet:peername(Socket)]),
|
|
|
56 |
gen_tcp:close(Socket),
|
|
|
57 |
Err;
|
|
|
58 |
Data -> Data
|
|
|
59 |
end.
|
|
|
60 |
|
|
|
61 |
%% We can't really flush - everything is flushed when we write
|
|
|
62 |
flush(_) ->
|
|
|
63 |
ok.
|
|
|
64 |
|
|
|
65 |
close(#data{socket = Socket}) ->
|
|
|
66 |
gen_tcp:close(Socket).
|
|
|
67 |
|
|
|
68 |
|
|
|
69 |
%%%% FACTORY GENERATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
|
|
70 |
|
|
|
71 |
|
|
|
72 |
%% The following "local" record is filled in by parse_factory_options/2
|
|
|
73 |
%% below. These options can be passed to new_protocol_factory/3 in a
|
|
|
74 |
%% proplists-style option list. They're parsed like this so it is an O(n)
|
|
|
75 |
%% operation instead of O(n^2)
|
|
|
76 |
-record(factory_opts, {connect_timeout = infinity,
|
|
|
77 |
sockopts = [],
|
|
|
78 |
framed = false}).
|
|
|
79 |
|
|
|
80 |
parse_factory_options([], Opts) ->
|
|
|
81 |
Opts;
|
|
|
82 |
parse_factory_options([{framed, Bool} | Rest], Opts) when is_boolean(Bool) ->
|
|
|
83 |
parse_factory_options(Rest, Opts#factory_opts{framed=Bool});
|
|
|
84 |
parse_factory_options([{sockopts, OptList} | Rest], Opts) when is_list(OptList) ->
|
|
|
85 |
parse_factory_options(Rest, Opts#factory_opts{sockopts=OptList});
|
|
|
86 |
parse_factory_options([{connect_timeout, TO} | Rest], Opts) when TO =:= infinity; is_integer(TO) ->
|
|
|
87 |
parse_factory_options(Rest, Opts#factory_opts{connect_timeout=TO}).
|
|
|
88 |
|
|
|
89 |
|
|
|
90 |
%%
|
|
|
91 |
%% Generates a "transport factory" function - a fun which returns a thrift_transport()
|
|
|
92 |
%% instance.
|
|
|
93 |
%% This can be passed into a protocol factory to generate a connection to a
|
|
|
94 |
%% thrift server over a socket.
|
|
|
95 |
%%
|
|
|
96 |
new_transport_factory(Host, Port, Options) ->
|
|
|
97 |
ParsedOpts = parse_factory_options(Options, #factory_opts{}),
|
|
|
98 |
|
|
|
99 |
F = fun() ->
|
|
|
100 |
SockOpts = [binary,
|
|
|
101 |
{packet, 0},
|
|
|
102 |
{active, false},
|
|
|
103 |
{nodelay, true} |
|
|
|
104 |
ParsedOpts#factory_opts.sockopts],
|
|
|
105 |
case catch gen_tcp:connect(Host, Port, SockOpts,
|
|
|
106 |
ParsedOpts#factory_opts.connect_timeout) of
|
|
|
107 |
{ok, Sock} ->
|
|
|
108 |
{ok, Transport} = thrift_socket_transport:new(Sock),
|
|
|
109 |
{ok, BufTransport} =
|
|
|
110 |
case ParsedOpts#factory_opts.framed of
|
|
|
111 |
true -> thrift_framed_transport:new(Transport);
|
|
|
112 |
false -> thrift_buffered_transport:new(Transport)
|
|
|
113 |
end,
|
|
|
114 |
{ok, BufTransport};
|
|
|
115 |
Error ->
|
|
|
116 |
Error
|
|
|
117 |
end
|
|
|
118 |
end,
|
|
|
119 |
{ok, F}.
|