Subversion Repositories SmartDukaan

Rev

Go to most recent revision | Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
30 ashish 1
%%
2
%% Licensed to the Apache Software Foundation (ASF) under one
3
%% or more contributor license agreements. See the NOTICE file
4
%% distributed with this work for additional information
5
%% regarding copyright ownership. The ASF licenses this file
6
%% to you under the Apache License, Version 2.0 (the
7
%% "License"); you may not use this file except in compliance
8
%% with the License. You may obtain a copy of the License at
9
%%
10
%%   http://www.apache.org/licenses/LICENSE-2.0
11
%%
12
%% Unless required by applicable law or agreed to in writing,
13
%% software distributed under the License is distributed on an
14
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
%% KIND, either express or implied. See the License for the
16
%% specific language governing permissions and limitations
17
%% under the License.
18
%%
19
 
20
-module(thrift_socket_transport).
21
 
22
-behaviour(thrift_transport).
23
 
24
-export([new/1,
25
         new/2,
26
         write/2, read/2, flush/1, close/1,
27
 
28
         new_transport_factory/3]).
29
 
30
-record(data, {socket,
31
               recv_timeout=infinity}).
32
 
33
new(Socket) ->
34
    new(Socket, []).
35
 
36
new(Socket, Opts) when is_list(Opts) ->
37
    State =
38
        case lists:keysearch(recv_timeout, 1, Opts) of
39
            {value, {recv_timeout, Timeout}}
40
            when is_integer(Timeout), Timeout > 0 ->
41
                #data{socket=Socket, recv_timeout=Timeout};
42
            _ ->
43
                #data{socket=Socket}
44
        end,
45
    thrift_transport:new(?MODULE, State).
46
 
47
%% Data :: iolist()
48
write(#data{socket = Socket}, Data) ->
49
    gen_tcp:send(Socket, Data).
50
 
51
read(#data{socket=Socket, recv_timeout=Timeout}, Len)
52
  when is_integer(Len), Len >= 0 ->
53
    case gen_tcp:recv(Socket, Len, Timeout) of
54
        Err = {error, timeout} ->
55
            error_logger:info_msg("read timeout: peer conn ~p", [inet:peername(Socket)]),
56
            gen_tcp:close(Socket),
57
            Err;
58
        Data -> Data
59
    end.
60
 
61
%% We can't really flush - everything is flushed when we write
62
flush(_) ->
63
    ok.
64
 
65
close(#data{socket = Socket}) ->
66
    gen_tcp:close(Socket).
67
 
68
 
69
%%%% FACTORY GENERATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
70
 
71
 
72
%% The following "local" record is filled in by parse_factory_options/2
73
%% below. These options can be passed to new_protocol_factory/3 in a
74
%% proplists-style option list. They're parsed like this so it is an O(n)
75
%% operation instead of O(n^2)
76
-record(factory_opts, {connect_timeout = infinity,
77
                       sockopts = [],
78
                       framed = false}).
79
 
80
parse_factory_options([], Opts) ->
81
    Opts;
82
parse_factory_options([{framed, Bool} | Rest], Opts) when is_boolean(Bool) ->
83
    parse_factory_options(Rest, Opts#factory_opts{framed=Bool});
84
parse_factory_options([{sockopts, OptList} | Rest], Opts) when is_list(OptList) ->
85
    parse_factory_options(Rest, Opts#factory_opts{sockopts=OptList});
86
parse_factory_options([{connect_timeout, TO} | Rest], Opts) when TO =:= infinity; is_integer(TO) ->
87
    parse_factory_options(Rest, Opts#factory_opts{connect_timeout=TO}).
88
 
89
 
90
%%
91
%% Generates a "transport factory" function - a fun which returns a thrift_transport()
92
%% instance.
93
%% This can be passed into a protocol factory to generate a connection to a
94
%% thrift server over a socket.
95
%%
96
new_transport_factory(Host, Port, Options) ->
97
    ParsedOpts = parse_factory_options(Options, #factory_opts{}),
98
 
99
    F = fun() ->
100
                SockOpts = [binary,
101
                            {packet, 0},
102
                            {active, false},
103
                            {nodelay, true} |
104
                            ParsedOpts#factory_opts.sockopts],
105
                case catch gen_tcp:connect(Host, Port, SockOpts,
106
                                           ParsedOpts#factory_opts.connect_timeout) of
107
                    {ok, Sock} ->
108
                        {ok, Transport} = thrift_socket_transport:new(Sock),
109
                        {ok, BufTransport} =
110
                            case ParsedOpts#factory_opts.framed of
111
                                true  -> thrift_framed_transport:new(Transport);
112
                                false -> thrift_buffered_transport:new(Transport)
113
                            end,
114
                        {ok, BufTransport};
115
                    Error  ->
116
                        Error
117
                end
118
        end,
119
    {ok, F}.