`
argan
  • 浏览: 126461 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

Erlang 基础学习 11 Programming with Sockets

阅读更多

Day 11 Programming with Sockets

• gen_tcp 和 gen_udp 两个包

使用TCP
• 从服务器取数据
• 1. {ok,Socket}=gen_tcp:connect(Host,Port,Params)
• 2. ok = gen_tcp:send(Socket,Data).
• 3.receive_data,处理两种包{tcp,Socket,Bin}和{tcp_closed,Socket}
  receive_data(Socket, SoFar) ->
      receive
         {tcp,Socket,Bin} ->
              receive_data(Socket, [Bin|SoFar]);  %% 注意,这里list里的数据的顺序是倒的,最后要lists:reverse一下
         {tcp_closed,Socket} ->
             list_to_binary(reverse(SoFar))
      end.
• 一个简单的TCP Server
  start_nano_server() ->
     {ok, Listen} = gen_tcp:listen(2345, [binary, {packet, 4},
                                           {reuseaddr, true},
                                           {active, true}]),
     {ok, Socket} = gen_tcp:accept(Listen),
     gen_tcp:close(Listen),
      loop(Socket).
• 这个server很简单,监听端口,接受一次请求,然后处理那个请求
• {ok,Listen}=gen_tcp:listen(Port,Params).
• {ok,Socket}=gen_tcp:accept(Listen).
• 然后这个Socket就一样的处理了
• 改进一下,可以处理多次请求,只是简单的在处理完请求之后继续接收新的连接
start_seq_server() ->
    {ok, Listen} = gen_tcp:listen(...),
    seq_loop(Listen).
seq_loop(Listen) ->
    {ok, Socket} = gen_tcp:accept(Listen),
    loop(Socket),
    seq_loop(Listen).
• 其他的改进,可以同时处理多个请求,接收连接之后,但前进程处理请求,新开一个进程去等待连接
start_parallel_server() ->
    {ok, Listen} = gen_tcp:listen(...),
    spawn(fun() -> par_connect(Listen) end).
par_connect(Listen) ->
    {ok, Socket} = gen_tcp:accept(Listen),
    spawn(fun() -> par_connect(Listen) end),
    loop(Socket).

• 打开Socket的进程被称为控制进程,socket收到的消息都会发给控制进程,如果控制进程死掉了,那这个socket也会被关掉,可以通过gen_tcp:controlling_process(Socket, NewPid).来修改一个socket的控制进程
• 上面的并发处理的server有一个问题,有可能打开无数个处理进程,解决办法就是可以加个计数器
• 接收一个请求,创建socket之后,最好是明确的设定这个socket的一些参数,例如
{ok, Socket} = gen_tcp:accept(Listen),
inet:setopts(Socket, [{packet,4},binary,
                      {nodelay,true},{active, true}]),
loop(Socket)
• R11B-3 版本之后,erlang允许多个进程同时监听同一个Listen的socket,这样就可以实现创建一个pool来处理连接了

Control Issues
• erlang 的socket可以以三种模式打开active,active once,passive,可以在gen_tcp:listen,或者gen_tcp:connect的时候设置参数 {active, true | false | once}
• active socket,当数据来的时候,控制进程会收到{tcp, Socket, Data}消息,控制进程无法控制什么时候收到这些消息,完全被动的,nonblocking
• passive socket,控制进程必须使用gen_tcp:recv(Socket, N)明确的指定从Socket中读取N个字节,(如果N为0的话,所有可用的数据都会读出来),读取的时候是需要等待的,这样,服务器就可以完全控制什么时候读取数据,blocking

Active Message Reception (Nonblocking)

{ok, Listen} = gen_tcp:listen(Port, [..,{active, true}...]),
{ok, Socket} = gen_tcp:accept(Listen),
loop(Socket).

loop(Socket) ->
    receive
        {tcp, Socket, Data} ->
            ... do something with the data ...
        {tcp_closed, Socket} ->
            ...
    end.
• 如果客户端不停的发送消息,同时服务端处理的速度跟不上的话,服务器端将会被撑死,这就是flood攻击
• 因为服务器端无法block住客户端,因此称之为nonblocking server
• 只有在确信能支撑起客户端的需要的时候才使用这种方式

Passive Message Reception (Blocking)

{ok, Listen} = gen_tcp:listen(Port, [..,{active, false}...]),
{ok, Socket} = gen_tcp:accept(Listen),
loop(Socket).
loop(Socket) ->
    case gen_tcp:recv(Socket, N) of
        {ok, B} ->
            ... do something with the data ...
            loop(Socket);
        {error, closed}
            ...
    end.

• 服务器端可以自由的控制在需要的时候才调用recv,因此在此之前客户端将被block住(操作系统的buffer除外)
• 因此服务器端很安全,至少程序不至于被客户端疯狂的发信息而撑死(操作系统撑死除外)
• 这种阻塞模式服务器也只能等待一个socket的数据

The Hybrid Approach (Partial Blocking)
{ok, Listen} = gen_tcp:listen(Port, [..,{active, once}...]),
{ok, Socket} = gen_tcp:accept(Listen),
loop(Socket).
loop(Socket) ->
    receive
        {tcp, Socket, Data} ->
            ... do something with the data ...
            %% when you're ready enable the next message
            inet:setopts(Sock, [{active, once}]),
            loop(Socket);
         {tcp_closed, Socket} ->
            ...
    end.
• 我们使用{active,once},这样收到消息之后,socket变成passive模式,等我们处理完消息之后,再设置socket为{active,once}
• 这样,服务端不会被客户端的数据撑死了,同时也能处理很多连接的请求了

• @spec inet:peername(Socket) -> {ok, {IP_Address, Port}} | {error, Why}
• 可以查看Socket的另一边是谁
• IP_Address的格式{N1,N2,N3,N4},{K1,K2,K3...K8}分别支持IPV4,IPV6,这里的N和K都是0-255的整数

Socket编程的错误处理
• 不需要错误处理!
• 如果控制进程死掉了,相应的socket就会断开,另一端会收到消息{tcp_closed,Socket}

UDP
• UDP因为是无连接的协议,因此编程模型很简单,最简单的服务器
server(Port) ->
{ok, Socket} = gen_udp:open(Port, [binary]),
loop(Socket).
loop(Socket) ->
    receive
       {udp, Socket, Host, Port, Bin} ->
           BinReply = ... ,
           gen_udp:send(Socket, Host, Port, BinReply),
           loop(Socket)
    end.
• 客户端也很简单
client(Request) ->
    {ok, Socket} = gen_udp:open(0, [binary]),
    ok = gen_udp:send(Socket, "localhost" , 4000, Request),
    Value = receive
               {udp, Socket, _, _, Bin} ->
                    {ok, Bin}
            after 2000 ->
               error
            end,
    gen_udp:close(Socket),
    Value.

Additional Notes on UDP
• 由于UDP是无连接的,因此有可能同一个包会发送两次,这时就分不清楚到底有没有接收到过这个包了
• 要解决的话,可以在请求的包里加上一个唯一标志,然后让服务端也返回这个标志,这样,接收的时候就可以检查是不是相应的回答了,不过如果那种没有回答的请求呢?
• 可以使用make_ref()来获得一个erlang系统内唯一的标志

Broadcasting to Multiple Machines

-module(broadcast).
-compile(export_all).
send(IoList) ->
    case inet:ifget("eth0" , [broadaddr]) of
        {ok, [{broadaddr, Ip}]} ->
            {ok, S} = gen_udp:open(5010, [{broadcast, true}]),
            gen_udp:send(S, Ip, 6000, IoList),
            gen_udp:close(S);
        _ ->
            io:format("Bad interface name, or\n"
                      "broadcasting not supported\n" )
    end.
listen() ->
    {ok, S} = gen_udp:open(6000),
    loop(S).
loop(S) ->
    receive
        Any ->
            io:format("received:~p~n" , [Any]),
            loop(S)
    end.
• 注意广播需要网络支持
• 一般的路由器会屏蔽跨子网的udp广播包

TODO 阅读SHOUTcast server代码


分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics