`
argan
  • 浏览: 126194 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

读了一下hotwheels的代码

阅读更多

上次提到的一个挑战,公开出来的一些代码,这两天抽时间仔细读了一下,感觉对OTP了解更多了(以前没怎么接触过完整的应用),下面是一些阅读笔记和简单的分析

 

hotwheels

 

程序入口 hotwheels,启动app hotwheels,通过hotwheels.app查看入口模块,

app的入口模块hotwheels_app,作为一个application,定义了几个子进程:

* 模块hotwheels_acceptor,作为TCP Server,监听端口,接受请求,名字为hotwheels_sup

* 模块topic,作为topic管理器,hotwheels_topic_sup

* 启动自己作为一个supervisor,名字为hotwheels_transport_sup

** 定义了一个child,是transport

 

TCP Server

 

TCP Server是一个gen_server,处理掉了几乎所有的异常情况,包括打开连接过多(Too Many Open Files),这时候他会sleep一会儿,然后继续处理

 

TCP Server启动的时候开始监听端口,接受请求,当有请求进来的时候,就让Child Instance Supervisor(hotwheels_app)启动一个

Child(transport)来处理这个客户端,这段代码是:

 

handle_connection(State, Socket) ->
    {ok, Pid} = hotwheels_app:start_transport(State#state.port),
    ok = gen_tcp:controlling_process(Socket, Pid),
    %% Instruct the new handler to own the socket.
    (State#state.module):set_socket(Pid, Socket).
 

 

transport

 

transport负责处理客户端连接,为了更灵活的处理,socket都被设置成{active,once}(这个原理请见xxx的分析)。

 

transport也是一个gen_server,在启动的时候没有注册名字,因此可以启动n个实例(由application来启动)

因为socket是active once的,因此所有收到的数据都将以message的方式送来,都需要通过handle_info来处理,客户端会发三种消息来,

SUBSCRIBE,UNSUBSCRIBE,PUBLISH,处理这三种消息的时候都是先将socket重新设置成{active,once},然后将消息交给topic去处理

 

transport设置了trap_exit为true,抓住并忽略掉了处理进程(topic)失败的情况

 

如果收到ACK消息,则应答一个ACK消息,同时带上时间戳

 

如果收到不认识的binary数据,则直接将数据发送给客户端(为啥?)

 

topic

 

topic 负责是pub sub的入口,负责调度,如果要sub的topic不存则,则通知pubsub模块初始化一个新的,然后用mapper模块记录下来

真正的pub,sub操作是由pubsub模块完成的,每个topic都由一个单独的pubsub进程管理

 

mapper

 

mapper模块很简单,就是管理进程的Pid和Topic名字之间的隐射关系,同时维护双向的关系,pid2topic,topic2pid,建立两张ets表,

分别保存这两种隐射关系

 

pubsub

 

记录每个subscribors,记录的数据结构是{Pid,Ref,Sock},Pid是subscribor的pid,Ref是sub的时候monitor的reference,Sock

是该客户端连接的socket,消息进来(publish)的时候:

 

handle_cast({publish, Msg}, State) ->
    io:format("info: ~p~n", [ets:info(State#state.subs)]),
    {A, B, C} = Start = now(),
    Msg1 = <<A:32, B:32, C:32, ?MESSAGE, Msg/binary>>,
    F = fun({_, _, Sock}, _) -> erlang:port_command(Sock, Msg1) end,  %% 使用port_command来发送
    erlang:process_flag(priority, high),
    ets:foldl(F, ok, State#state.subs),  %% 遍历ets里的所有内容
    End = now(),
    erlang:process_flag(priority, normal),
    io:format("time: ~p~n", [timer:now_diff(End, Start) / 1000]),
    {noreply, State};
 

 

 

分享到:
评论
1 楼 mryufeng 2009-09-09  
温习这个代码确实能学到很多东西。

相关推荐

Global site tag (gtag.js) - Google Analytics