Rev 30 | Blame | Compare with Previous | Last modification | View Log | RSS feed
%%%% Licensed to the Apache Software Foundation (ASF) under one%% or more contributor license agreements. See the NOTICE file%% distributed with this work for additional information%% regarding copyright ownership. The ASF licenses this file%% to you under the Apache License, Version 2.0 (the%% "License"); you may not use this file except in compliance%% with the License. You may obtain a copy of the License at%%%% http://www.apache.org/licenses/LICENSE-2.0%%%% Unless required by applicable law or agreed to in writing,%% software distributed under the License is distributed on an%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY%% KIND, either express or implied. See the License for the%% specific language governing permissions and limitations%% under the License.%%%%% Todo: this might be better off as a gen_server type of transport%%% that handles stuff like group commit, similar to TFileTransport%%% in cpp land-module(thrift_disk_log_transport).-behaviour(thrift_transport).%% API-export([new/2, new_transport_factory/2, new_transport_factory/3]).%% thrift_transport callbacks-export([read/2, write/2, force_flush/1, flush/1, close/1]).%% state-record(dl_transport, {log,close_on_close = false,sync_every = infinity,sync_tref}).%% Create a transport attached to an already open log.%% If you'd like this transport to close the disk_log using disk_log:lclose()%% when the transport is closed, pass a {close_on_close, true} tuple in the%% Opts list.new(LogName, Opts) when is_atom(LogName), is_list(Opts) ->State = parse_opts(Opts, #dl_transport{log = LogName}),State2 =case State#dl_transport.sync_every ofN when is_integer(N), N > 0 ->{ok, TRef} = timer:apply_interval(N, ?MODULE, force_flush, State),State#dl_transport{sync_tref = TRef};_ -> Stateend,thrift_transport:new(?MODULE, State2).parse_opts([], State) ->State;parse_opts([{close_on_close, Bool} | Rest], State) when is_boolean(Bool) ->State#dl_transport{close_on_close = Bool};parse_opts([{sync_every, Int} | Rest], State) when is_integer(Int), Int > 0 ->State#dl_transport{sync_every = Int}.%%%% TRANSPORT IMPLENTATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% disk_log_transport is write-onlyread(_State, Len) ->{error, no_read_from_disk_log}.write(#dl_transport{log = Log}, Data) ->disk_log:balog(Log, erlang:iolist_to_binary(Data)).force_flush(#dl_transport{log = Log}) ->error_logger:info_msg("~p syncing~n", [?MODULE]),disk_log:sync(Log).flush(#dl_transport{log = Log, sync_every = SE}) ->case SE ofundefined -> % no time-based syncdisk_log:sync(Log);_Else -> % sync will happen automagicallyokend.%% On close, close the underlying log if we're configured to do so.close(#dl_transport{close_on_close = false}) ->ok;close(#dl_transport{log = Log}) ->disk_log:lclose(Log).%%%% FACTORY GENERATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%new_transport_factory(Name, ExtraLogOpts) ->new_transport_factory(Name, ExtraLogOpts, [{close_on_close, true},{sync_every, 500}]).new_transport_factory(Name, ExtraLogOpts, TransportOpts) ->F = fun() -> factory_impl(Name, ExtraLogOpts, TransportOpts) end,{ok, F}.factory_impl(Name, ExtraLogOpts, TransportOpts) ->LogOpts = [{name, Name},{format, external},{type, wrap} |ExtraLogOpts],Log =case disk_log:open(LogOpts) of{ok, Log} ->Log;{repaired, Log, Info1, Info2} ->error_logger:info_msg("Disk log ~p repaired: ~p, ~p~n", [Log, Info1, Info2]),Logend,new(Log, TransportOpts).