阅读 rml_rtmp 示例代码之一个简单的RTMP服务器(threaded_rtmp_server)


最近想找个简单的项目和代码了解下RTMP协议收发,在Github上翻到一个项目—— rust-media-libs,代码量不多,正好适合阅读学习,同时它使用 Rust 编写,能够帮助我更多的了解 Rust。

学习更底层的库之前,本文记录阅读其基础示例——“threaded_rtmp_server”, 这个示例代码位置在 rust-media-libs/examples/threaded_rtmp_server,代码行数 820 行。

# wc -l src/*.rs
  173 connection.rs
  111 main.rs
  536 server.rs
  820 total

测试

首先,进到示例目录运行示例

cargo run

推送视频,IP地址为示例运行的服务器

ffmpeg -re -i video.flv -vcodec copy -acodec copy -f flv -y rtmp://192.168.3.189:1935/live/livestream

推送后,使用VLC播放流媒体,播放地址就是推送地址

我这里测试示例运行良好且稳定,接下来可以“放心”的开始阅读源代码

开始吧!

打开 main.rs 入口文件

extern crate bytes;
extern crate slab;
extern crate rml_rtmp;

mod connection;
mod server;

use std::collections::{HashSet};
use std::net::{TcpListener, TcpStream};
use std::sync::mpsc::{channel, Receiver, TryRecvError};
use std::thread;
use slab::Slab;
use ::connection::{Connection, ConnectionError, ReadResult};
use ::server::{Server, ServerResult};

// ...

引用了一些标准库,三方模块依赖,及自己定义的模块。

在继续阅读代码之前,我决定把没用过的模块搜索了解一下。

依赖模块

slab 模块 —— https://docs.rs/slab/0.4.2/slab/

和 Vec 最大的不同是插入数据返回数据索引位置

extern crate slab;

use slab::Slab;

fn main() {
    let mut slab = Slab::new();

    let id = slab.insert("David");

    assert_eq!(slab[id], "David");

    slab[id] = "Join";
    assert_eq!(slab[id], "Join");
}

channel 模块 —— 用于进程间单向数据流发送

例子来自:https://doc.rust-lang.org/stable/rust-by-example/std_misc/channels.html

use std::sync::mpsc::{Sender, Receiver};
use std::sync::mpsc;
use std::thread;

static NTHREADS: i32 = 10;

fn main() {

    let (tx, rx): (Sender<i32>, Receiver<i32>) = mpsc::channel();
    let mut children = Vec::new();

    // 使用线程发送数据
    for id in 0..NTHREADS {
        let thread_tx = tx.clone();

        let child = thread::spawn(move || {
            thread_tx.send(id).unwrap();
            println!("thread {} finished", id);
        });

        children.push(child);
    }

    // 接收数据
    let mut ids = Vec::with_capacity(NTHREADS as usize);
    for _ in 0..NTHREADS {
        ids.push(rx.recv());
    }

    // 等待线程全部执行完成
    for child in children {
        child.join().expect("oops! the child thread paniced");
    }

    // 打印接收
    println!("{:?}", ids);
}

输出

thread 0 finished
thread 2 finished
thread 3 finished
thread 1 finished
thread 4 finished
thread 5 finished
thread 6 finished
thread 8 finished
thread 7 finished
thread 9 finished
[Ok(0), Ok(2), Ok(3), Ok(1), Ok(4), Ok(5), Ok(6), Ok(8), Ok(7), Ok(9)]

入口文件(main.rs)

扫清了拦路虎,继续阅读 main.rs 主线

fn main() {
    let address = "0.0.0.0:1935";
    let listener = TcpListener::bind(&address).unwrap();

    let (stream_sender, stream_receiver) = channel();
    thread::spawn(|| {handle_connections(stream_receiver)});

    println!("Listening for connections on {}", address);
    for stream in listener.incoming() {
        println!("New connection!");
        match stream_sender.send(stream.unwrap()) {
            Ok(_) => (),
            Err(error) => panic!("Error sending stream to connection handler: {:?}", error),
        }
    }
}

一个十分简洁的主函数,使用 TCP 绑定了 1935 端口用于监听数据,紧接着使用 channel 函数创建在线程间用于传输消息的接收者和发送者。使用 thread::spawn 创建一个线程,这个线程拥有接收者的所有权,用于接收 TCP 发送过来的媒体流数据。

之后的代码,主进程阻塞进入 TCP 监听,当接收到 TCP 数据时,使用消息发送者将数据发送给刚刚创建的线程进行处理。

主函数下边有一个 handle_connections() 函数,用于接收并解析消息。

fn handle_connections(connection_receiver: Receiver<TcpStream>) {
    let mut connections = Slab::new();
    let mut connection_ids = HashSet::new();
    let mut server = Server::new();

    loop {
        match connection_receiver.try_recv() {
            Err(TryRecvError::Disconnected) => panic!("Connection receiver closed"),
            Err(TryRecvError::Empty) => (),
            Ok(stream) => {
                let connection = Connection::new(stream);
                let id = connections.insert(connection);
                let connection = connections.get_mut(id).unwrap();
                connection.connection_id = Some(id);
                connection_ids.insert(id);

                println!("Connection {} started", id);
            }
        }

        // 其它代码

    }
}

函数首先初始化了一个 slab 变量,一个 hashset 集合,这个 Server 变量是? 到上边看看 use ::server::{Server, ServerResult}; 进去简单看一眼,原来是在同级目录 server.rs 文件定义的结构体。

暂时先不管它的具体实现。

connection_receiver.try_recv() 持续监听主进程发过来的数据,如果没有报错,进入Ok流程,使用 Connection::new() 方法来初始化一个连接,这个 Connection 结构体是同目录的 connection.rs 提供的。

先初始化一个结构体,将结构体插入到 slab 数组中,之后根据 id 从 slab 数组中再获取到这个结构体实例,值得主意的是,初始化的结构体和插入到数组中的结构体不是同一个,且看示例

let mut slab = Slab::new();

let data = 5;
println!("{:p}", &data);
let id = slab.insert(data);
let data = slab.get_mut(id).unwrap();
println!("{:p}", &data);

// 地址不同
// 0x7ffc209dca74
// 0x7ffc209dcac8

将结构体的内部 connection_id 赋值,同时也将 id 值插入到 connection_ids 集合中。

继续来看,声明了两个动态数组 ids_to_clear 与 packets_to_write 暂时不知道用来干什么,先跳过,之后遍历集合,从中获取到所有客户端连接的索引 id,根据 id 获取到连接信息,从连接中获取数据。

let mut ids_to_clear = Vec::new();
let mut packets_to_write = Vec::new();
for connection_id in &connection_ids {
    let connection = connections.get_mut(*connection_id).unwrap();
    match connection.read() {
        Err(ConnectionError::SocketClosed) => {
            println!("Socket closed for id {}", connection_id);
            ids_to_clear.push(*connection_id);
        },

        Err(error) => {
            println!("I/O error while reading connection {}: {:?}", connection_id, error);
            ids_to_clear.push(*connection_id);
        },

        Ok(result) => {
            match result {
                ReadResult::NoBytesReceived => (),
                ReadResult::HandshakingInProgress => (),
                ReadResult::BytesReceived {buffer, byte_count} => {
                    let mut server_results = match server.bytes_received(*connection_id, &buffer[..byte_count]) {
                        Ok(results) => results,
                        Err(error) => {
                            println!("Input caused the following server error: {}", error);
                            ids_to_clear.push(*connection_id);
                            continue;
                        },
                    };

                    for result in server_results.drain(..) {
                        match result {
                            ServerResult::OutboundPacket {target_connection_id, packet} => {
                                packets_to_write.push((target_connection_id, packet));
                            },

                            ServerResult::DisconnectConnection {connection_id: id_to_close} => {
                                ids_to_clear.push(id_to_close);
                            }
                        }
                    }
                }
            }
        }
    }
}

跳过两个错误处理分支,到达 Ok 分支后,表示从连接中获取到了数据,数据也分为几种类型,无数据,握手消息,其它数据,这里使用到了 server.bytes_received() 方法,具体的实现得稍后进到 server.rs 中看实现才能知道,此处知道它处理了消息就够了。

server 返回了 server_results 结果,匹配这个结构,如果是 OutboundPacket 类型,那么将目标 id 与 数据包存储到 packets_to_write 数组中,就是上边定义的那个。如果是断开连接的数据类型,那么将 id 加入到即将断开连接的数组中。

接下来的两个循环,遍历了 packets_to_write 与 ids_to_clear 数组,连接池中取出连接发送消息,移除清理断开连接列表中的连接信息。

for (connection_id, packet) in packets_to_write.drain(..) {
    let connection = connections.get_mut(connection_id).unwrap();
    connection.write(packet.bytes);
}

for closed_id in ids_to_clear {
    println!("Connection {} closed", closed_id);
    connection_ids.remove(&closed_id);
    connections.remove(closed_id);
    server.notify_connection_closed(closed_id);
}

回顾一下 main.rs 文件,共 110 行代码,没有涉及到数据的解析和处理,主要负责处理监听,创建线程处理消息,并实现了连接池的维护功能,即根据消息来创建连接或当解析消息异常及客户端主动断开连接时清理客户端信息,整体来看,简洁清晰,接下来时重头戏,看一下数据是如何解析的。

消息解析实现(server.rs)

这个一部分实现了服务器端逻辑处理,代码量就相对多些了,500多行,也不多。

以下摘录部分代码及伪代码来看下逻辑。

pub struct Server {
    clients: Slab<Client>,
    connection_to_client_map: HashMap<usize, usize>,
    channels: HashMap<String, MediaChannel>,
}

Server 是一个结构体,后续的各种方法是基于 Server 的实现,其中,clients 是 client 数组,client 也是一个结构体实例。

struct Client {
    session: ServerSession,
    current_action: ClientAction,
    connection_id: usize,
    has_received_video_keyframe: bool,
}

client 保存着客户端的 session(session 是更底层的模块生成维护的),状态,链接的 id 及是否收到关键帧标识。

接着看 Server 结构体的 connection_to_client_map 字段,它是一个字典,用来存储 connection_id 与 client 在 clients 中位置的对应关系,以便能够通过 connection_id 能够获取到客户端对应的 client 获取 session 及状态等。

main.rs 中看到引入 server.rsbytes_received() 方法进行监听,那么看看这个方法做了什么~

伪代码

fn bytes_received(connection_id, bytes):

    # 如果 connection_id 第一次连接
    if connect_id not in connection_to_client_map:

        # 0, 初始化一个服务器回执
        let server_results = []

        # 1, 调用底层库获取 Session(第一处)
        let session, session_results = ServerSession::new(config)

        # 2, 处理 Session 返回结果,比如有没有新的消息需要处理
        handle_session_results(connction_id, session_results, server_results)

        # 3, 初始化客户端
        client = Client { session, connection_id, ... }

        # 4, 将客户端存入 clients
        let client_id = clients.push(client)

        # 5, 将客户端 id 与连接 id 进行关联
        connection_to_client_map[connection_id] = client_id

    # 6, 获取到 client, 并调用 client.session.handle_input 处理数据(第二处)
    let client_results = client.session.handle_input(bytes)

    # 7, 处理 Session 返回结果
    handle_session_results(connction_id, client_results, server_results)

    # 返回结果
    return server_results

以上伪代码舍弃了一些语言细节,同时保留了变量的命名,扫两眼后,可以看到有两个地方很重要,一是 Session 的创建及结果,这也是后续阅读更底层源码需要搞清楚的地方,二是 handle_session_results() 函数执行了两次,它也是在同文件定义的

fn handle_session_results(&mut self,
                          executed_connection_id: usize,
                          session_results: Vec<ServerSessionResult>,
                          server_results: &mut Vec<ServerResult>) {
    for result in session_results {
        match result {
            ServerSessionResult::OutboundResponse(packet) => {
                server_results.push(ServerResult::OutboundPacket {
                    target_connection_id: executed_connection_id,
                    packet,
                })
            },

            ServerSessionResult::RaisedEvent(event) =>
                self.handle_raised_event(executed_connection_id, event, server_results),

            x => println!("Server result received: {:?}", x),
        }
    }
}

可以看到它的作用是对 session 返回的内容进行处理,如果 session 返回了 OutboundResponse(出栈消息),那么把它保存到 server_results,如果是 RaisedEvent(入栈事件),那么调用 handle_raised_event() 方法进行处理。

以下是 handle_raised_event() 的实现,可以看到,它处理了客户端的请求,比如 ConnectionRequested(连接请求),PublishStreamRequested(推送请求),PlayStreamRequested(播放请求)等等。

fn handle_raised_event(&mut self,
                       executed_connection_id: usize,
                       event: ServerSessionEvent,
                       server_results: &mut Vec<ServerResult>) {
    match event {
        ServerSessionEvent::ConnectionRequested {request_id, app_name} => {
            self.handle_connection_requested(executed_connection_id, request_id, app_name, server_results);
        },

        ServerSessionEvent::PublishStreamRequested {request_id, app_name, stream_key, mode: _} => {
            self.handle_publish_requested(executed_connection_id, request_id, app_name, stream_key, server_results);
        },

        ServerSessionEvent::PlayStreamRequested {request_id, app_name, stream_key, start_at: _, duration: _, reset: _, stream_id} => {
            self.handle_play_requested(executed_connection_id, request_id, app_name, stream_key, stream_id, server_results);
        },

        ... 省略 ...
        },

        _ => println!("Event raised by connection {}: {:?}", executed_connection_id, event),
    }
}

server.rs 后续的代码,就是 handle_raised_event() 方法中各项分支处理的细节实现了,再此就不每个都进行罗列了

处理连接(connection.rs)

在之前阅读的的 main.rs 中,当接收到 TCP 数据时,这里引用了 connection.rs 中的结构体

use ::connection::{Connection, ConnectionError, ReadResult};

...

let connection = Connection::new(stream);

看一下实现

 42     pub fn new(socket: TcpStream) -> Connection {
 43         let (byte_sender, byte_receiver) = channel();
 44         let (result_sender, result_receiver) = channel();
 45 
 46         start_byte_writer(byte_receiver, &socket);
 47         start_result_reader(result_sender, &socket);
 48 
 49         Connection {
 50             connection_id: None,
 51             writer: byte_sender,
 52             reader: result_receiver,
 53             handshake: Handshake::new(PeerType::Server),
 54             handshake_completed: false,
 55         }
 56     }

这里用 channel() 初始化了两个用于进程间数据传输的通道,start_byte_writer() 函数接收了 byte_receiver,第 51 行代码,初始化结构体的时候把配对的 byte_sender 赋值给了 writer,那就比较清晰了,connection 连接实例的 writer 方法,发送的数据进到了 start_byte_writer() 函数中,且看它做了什么。

117 fn start_byte_writer(byte_receiver: Receiver<Vec<u8>>, socket: &TcpStream) {
118     let mut socket = socket.try_clone().expect("failed to clone socket");
119     thread::spawn(move|| {
120         let mut send_queue = VecDeque::new();
121 
122         loop {
123             loop {
124                 match byte_receiver.try_recv() {
125                     Err(TryRecvError::Empty) => break,
126                     Err(TryRecvError::Disconnected) => return,
127                     Ok(bytes) => send_queue.push_back(bytes),
128                 }
129             }
130 
131             match send_queue.pop_front() {
132                 None => thread::sleep(Duration::from_millis(1)),
133                 Some(bytes) => match socket.write(&bytes) {
134                     Ok(_) => (),
135                     Err(error) => {
136                         println!("Error writing to socket: {:?}", error);
137                         return;
138                     }
139                 }
140             }
141         }
142     });
143 }

119 行,启动线程来进行处理,120 行初始化了一个 VecDeque 队列。

第 124 行,看到了刚才说的一对的 byte_receiver 数据接收者,它接收到数据,将数据追加到了队列的底部,接下来的代码,通过 send_queue.pop_front() 再从顶部取出数据,通过 socket 发送出去。

这样,更加清晰的了解了 connection 实例,当我们调用它的 writer 时,它负责把消息最终通过 TCP 发送出去。

同理,start_result_reader() 方法负责接收 TCP 传输过来的数据,接收后进行拼接处理,然后发送出去,由它配对的 result_receiver 进行接收。

145 fn start_result_reader(sender: Sender<ReadResult>, socket: &TcpStream) {
146     let mut socket = socket.try_clone().unwrap();
147     thread::spawn(move|| {
148         let mut buffer = [0; BUFFER_SIZE];
149         loop {
150             match socket.read(&mut buffer) {
151                 Ok(0) => return, // socket closed
152                 Ok(read_count) => {
153                     let mut send_buffer = [0; BUFFER_SIZE];
154                     for x in 0..read_count {
155                         send_buffer[x] = buffer[x];
156                     }
157 
158                     let result = ReadResult::BytesReceived {
159                         buffer: send_buffer,
160                         byte_count: read_count,
161                     };
162 
163                     sender.send(result).unwrap();
164                 },
165 
166                 Err(error) => {
167                     println!("Error occurred reading from socket: {:?}", error);
168                     return;
169                 }
170             }
171         }
172     });
173 }

Connection 结构体还实现了 read() 方法,它对 reader 进行了再次封装。

 62     pub fn read(&mut self) -> Result<ReadResult, ConnectionError> {
 63         match self.reader.try_recv() {
 64             Err(TryRecvError::Empty) => Ok(ReadResult::NoBytesReceived),
 65             Err(TryRecvError::Disconnected) => Err(ConnectionError::SocketClosed),
 66             Ok(result) => {
 67                 match self.handshake_completed {
 68                     true => Ok(result),
 69                     false => match result {
 70                         ReadResult::HandshakingInProgress => unreachable!(),
 71                         ReadResult::NoBytesReceived => Ok(result),
 72                         ReadResult::BytesReceived {buffer, byte_count}
 73                             => self.handle_handshake_bytes(&buffer[..byte_count]),
 74                     }
 75                 }
 76             }
 77         }
 78     }

再回到 main.rs 中

 55         for connection_id in &connection_ids {
 56             let connection = connections.get_mut(*connection_id).unwrap();
 57             match connection.read() {
 58                  ...
 59             }
 60         }

第 57 行,这里就是用的 read() 方法来获取 TCP 连接发送过来的原始数据,进一步处理后然后发送给了 Server 的 bytes_received() 方法进行处理。

另外,在 read() 实现中,判断了 TCP 握手是否完成,如果没完成则走一个握手流程,如果已经完成了 TCP 连接,那么数据就是业务的数据,可以简单处理后交给上一层处理。

在 connection.rs 中同样使用了更底层的 rml_rtmp 库中的方法来处理底层通信数据

use rml_rtmp::handshake::{Handshake, HandshakeProcessResult, PeerType};

其余的一些函数就是一些细节的处理等,抽时间需要再多看实现细节学习一下,总体来讲这个基于线程的 RTMP 服务器的框架梳理清晰了。

threaded_rtmp_server

遗留待研究

因为这个示例代码是基于更底层的 rml_rtmp 模块开发的,所以在阅读 rml_rtmp 之前,一定是有些函数,大概知道它们是做什么的,然而没看代码实现,不知道它的具体实现细节,在此进行记录,以便后续带着问题阅读源代码,并解决遗留问题。

1,ServerSessionConfig::new() 的如何实现,它都包含哪些数据?

2,ServerSession::new(config) 返回的 ServerSessionResult 数据结构如何,在消息处理中扮演怎样的角色?

3,Connection 中用到 rml_rtmp 库的 TCP 握手等处理函数,具体的代码实现怎样,值得学习。

目前看来短时间还阅读不到 rml_rtmp 的代码,因为还有个 demo 示例 —— mio_rtmp_server

熟悉完这个示例,然后再系统的过一遍 RTMP 协议再进行更底层的库阅读学习,今天先到这里。

参考

  1. 代码地址:https://github.com/KallDrexx/rust-media-libs/tree/master/examples/threaded_rtmp_server