From 26c2cfc952e5f19ce102e944039525135a5ce9ea Mon Sep 17 00:00:00 2001 From: Jakub Pryc <94321002+Noarkhh@users.noreply.github.com> Date: Tue, 14 May 2024 12:43:24 +0200 Subject: [PATCH] Remove transport abstraction (#38) * Remove Transport behaviour * Improve socket control transfer * Separate getting the socket and getting control * Merge session/logic.ex with rtsp.ex * Remove unnecessary moduledocs --- README.md | 7 - lib/membrane_rtsp/rtsp.ex | 376 +++++++++++++----- lib/membrane_rtsp/server/conn.ex | 5 +- lib/membrane_rtsp/server/logic.ex | 7 +- lib/membrane_rtsp/session/logic.ex | 180 --------- lib/membrane_rtsp/tcp_socket.ex | 58 +++ lib/membrane_rtsp/transport/tcp_socket.ex | 74 ---- lib/membrane_rtsp/transport/transport.ex | 51 --- .../session/session_integration_test.exs | 7 +- .../session/session_logic_test.exs | 72 ++-- .../workflow_integration_test.exs | 121 +++--- test/support/fake.ex | 33 -- 12 files changed, 447 insertions(+), 544 deletions(-) delete mode 100644 lib/membrane_rtsp/session/logic.ex create mode 100644 lib/membrane_rtsp/tcp_socket.ex delete mode 100644 lib/membrane_rtsp/transport/tcp_socket.ex delete mode 100644 lib/membrane_rtsp/transport/transport.ex delete mode 100644 test/support/fake.ex diff --git a/README.md b/README.md index af0ea26..65e15bc 100644 --- a/README.md +++ b/README.md @@ -59,13 +59,6 @@ by calling: RTSP.close(session) ``` - -## Implementing a custom transport layer - -To implement custom request execution logic you must implement -`Membrane.RTSP.Transport` behavior. Then you can pass -the name of your transport module to `Membrane.RTSP.start_link/3`. - ## External tests Tests that use external RTSP service are disabled by default but they are present diff --git a/lib/membrane_rtsp/rtsp.ex b/lib/membrane_rtsp/rtsp.ex index ba15bb5..64ef4f5 100644 --- a/lib/membrane_rtsp/rtsp.ex +++ b/lib/membrane_rtsp/rtsp.ex @@ -1,53 +1,144 @@ defmodule Membrane.RTSP do - @moduledoc "RTSP Session" + @moduledoc """ + Functions for interfacing with a RTSP session + """ use GenServer - import Membrane.RTSP.Logic - alias Membrane.RTSP - alias Membrane.RTSP.Logic.State - alias Membrane.RTSP.{Request, Response} + alias Membrane.RTSP.{Request, Response, TCPSocket} @type t() :: pid() + @type options() :: [option()] + @type option() :: + {:connection_timeout, non_neg_integer()} | {:response_timeout, non_neg_integer()} + @default_rtsp_port 554 + @user_agent "MembraneRTSP/#{Mix.Project.config()[:version]} (Membrane Framework RTSP Client)" + + defmodule State do + @moduledoc false + @enforce_keys [:socket, :uri] + defstruct @enforce_keys ++ + [ + :response_timeout, + :session_id, + cseq: 0, + auth: nil + ] + + @type digest_opts() :: %{ + realm: String.t() | nil, + nonce: String.t() | nil + } + + @type auth() :: nil | :basic | {:digest, digest_opts()} + + @type t :: %__MODULE__{ + socket: :gen_tcp.socket(), + cseq: non_neg_integer(), + uri: URI.t(), + session_id: binary() | nil, + auth: auth(), + response_timeout: non_neg_integer() + } + end @doc """ - Starts and links session process. - - Sets following properties of Session: - * transport - information for executing request over the network. For - reference see `Membrane.RTSP.Transport` - * url - a base path for requests - * options - a keyword list that shall be passed when executing request over - transport + Starts and links session process with given URL as a base path for requests. """ - @spec start_link(binary(), module() | URI.t(), Keyword.t()) :: GenServer.on_start() - def start_link(url, transport \\ Membrane.RTSP.Transport.TCPSocket, options \\ []) do - do_start(url, transport, options, &GenServer.start_link/2) + @spec start_link(binary() | URI.t(), options()) :: GenServer.on_start() + def start_link(url, options \\ []) do + do_start(url, options, &GenServer.start_link/2) end - @spec start(binary(), module() | URI.t(), Keyword.t()) :: GenServer.on_start() - def start(url, transport \\ Membrane.RTSP.Transport.TCPSocket, options \\ []) do - do_start(url, transport, options, &GenServer.start/2) + @doc """ + Same as start_link/2, but doesn't link the session process. + """ + @spec start(binary() | URI.t(), Keyword.t()) :: GenServer.on_start() + def start(url, options \\ []) do + do_start(url, options, &GenServer.start/2) end - defp do_start(url, transport, options, start_fun) do - case URI.parse(url) do - %URI{host: host, scheme: "rtsp"} = url when is_binary(host) -> - start_fun.(__MODULE__, %{ - transport: transport, - url: %URI{url | port: url.port || @default_rtsp_port}, - options: options - }) + @spec request(pid(), binary(), RTSP.headers(), binary(), nil | binary()) :: Response.result() + def request(session, method, headers \\ [], body \\ "", path \\ nil) do + request = %Request{method: method, headers: headers, body: body, path: path} + GenServer.call(session, {:execute, request}, :infinity) + end - _else -> - {:error, :invalid_url} - end + @spec request_no_response(pid(), binary(), RTSP.headers(), binary(), nil | binary()) :: :ok + def request_no_response(session, method, headers \\ [], body \\ "", path \\ nil) do + request = %Request{method: method, headers: headers, body: body, path: path} + GenServer.cast(session, {:execute, request}) + end + + @spec close(pid()) :: :ok + def close(session), do: GenServer.cast(session, :terminate) + + @spec transfer_socket_control(t(), pid()) :: + :ok | {:error, :closed | :not_owner | :badarg | :inet.posix()} + def transfer_socket_control(session, new_controlling_process) do + GenServer.call(session, {:transfer_socket_control, new_controlling_process}) + end + + @spec get_socket(t()) :: :gen_tcp.socket() + def get_socket(session) do + GenServer.call(session, :get_socket) end + @type headers :: [{binary(), binary()}] + + @spec handle_response(t(), binary()) :: Response.result() + def handle_response(session, raw_response), + do: GenServer.call(session, {:parse_response, raw_response}) + + @spec get_parameter_no_response(t(), headers(), binary()) :: :ok + def get_parameter_no_response(session, headers \\ [], body \\ ""), + do: request_no_response(session, "GET_PARAMETER", headers, body) + + @spec play_no_response(t(), headers()) :: :ok + def play_no_response(session, headers \\ []), + do: request_no_response(session, "PLAY", headers, "") + + @spec describe(t(), headers()) :: Response.result() + def describe(session, headers \\ []), do: request(session, "DESCRIBE", headers, "") + + @spec announce(t(), headers(), binary()) :: Response.result() + def announce(session, headers \\ [], body \\ ""), + do: request(session, "ANNOUNCE", headers, body) + + @spec get_parameter(t(), headers(), binary()) :: Response.result() + def get_parameter(session, headers \\ [], body \\ ""), + do: request(session, "GET_PARAMETER", headers, body) + + @spec options(t(), headers()) :: Response.result() + def options(session, headers \\ []), do: request(session, "OPTIONS", headers) + + @spec pause(t(), headers()) :: Response.result() + def pause(session, headers \\ []), do: request(session, "PAUSE", headers) + + @spec play(t(), headers()) :: Response.result() + def play(session, headers \\ []), do: request(session, "PLAY", headers, "") + + @spec record(t(), headers()) :: Response.result() + def record(session, headers \\ []), do: request(session, "RECORD", headers) + + @spec setup(t(), binary(), headers()) :: Response.result() + def setup(session, path, headers \\ []), do: request(session, "SETUP", headers, "", path) + + @spec set_parameter(t(), headers(), binary()) :: Response.result() + def set_parameter(session, headers \\ [], body \\ ""), + do: request(session, "SET_PARAMETER", headers, body) + + @spec teardown(t(), headers()) :: Response.result() + @spec teardown(pid()) :: {:error, atom()} | {:ok, Membrane.RTSP.Response.t()} + def teardown(session, headers \\ []), do: request(session, "TEARDOWN", headers) + + @spec user_agent() :: binary() + def user_agent(), do: @user_agent + @impl true - def init(%{url: url, options: options, transport: transport_module}) do + def init(%{url: url, options: options}) do auth_type = case url do %URI{userinfo: nil} -> nil @@ -56,12 +147,11 @@ defmodule Membrane.RTSP do %URI{userinfo: info} when is_binary(info) -> :basic end - with {:ok, transport} <- transport_module.init(url, options) do + with {:ok, socket} <- TCPSocket.connect(url, options[:connection_timeout]) do state = %State{ - transport: transport, - transport_module: transport_module, + socket: socket, uri: url, - execution_options: options, + response_timeout: options[:response_timeout], auth: auth_type } @@ -81,10 +171,21 @@ defmodule Membrane.RTSP do end end - def handle_call(:get_transport, _from, %State{transport: transport} = state) do - {:reply, transport, state} + @impl true + def handle_call( + {:transfer_socket_control, new_controlling_process}, + _from, + %State{socket: socket} = state + ) do + {:reply, :gen_tcp.controlling_process(socket, new_controlling_process), state} + end + + @impl true + def handle_call(:get_socket, _from, %State{socket: socket} = state) do + {:reply, socket, state} end + @impl true def handle_call({:parse_response, raw_response}, _from, state) do with {:ok, response, state} <- parse_response(raw_response, state) do {:reply, {:ok, response}, state} @@ -98,100 +199,181 @@ defmodule Membrane.RTSP do {:stop, :normal, state} end + @impl true def handle_cast({:execute, request}, %State{cseq: cseq} = state) do case execute(request, state, false) do :ok -> {:noreply, %State{state | cseq: cseq + 1}} {:error, reason} -> - raise "Error: #{reason}" + raise "Error executing request #{inspect(request)}, reason: #{inspect(reason)}" end end @impl true - # this might be a message for transport layer. Redirect - def handle_info(msg, %State{} = state) do - state.transport_module.handle_info(msg, state.transport) - |> translate(:transport, state) + def handle_info({:tcp_closed, _socket}, state) do + {:stop, :socket_closed, state} end @impl true def terminate(_reason, state) do - state.transport_module.close(state.transport) + TCPSocket.close(state.socket) end - @spec request(pid(), binary(), RTSP.headers(), binary(), nil | binary()) :: Response.result() - def request(session, method, headers \\ [], body \\ "", path \\ nil) do - request = %Request{method: method, headers: headers, body: body, path: path} - GenServer.call(session, {:execute, request}, :infinity) - end + @spec do_start(URI.t(), options(), (module(), any() -> GenServer.on_start())) :: + GenServer.on_start() + defp do_start(url, options, start_fun) do + case URI.parse(url) do + %URI{host: host, scheme: "rtsp"} = url when is_binary(host) -> + start_fun.(__MODULE__, %{ + url: %URI{url | port: url.port || @default_rtsp_port}, + options: options + }) - @spec request_no_response(pid(), binary(), RTSP.headers(), binary(), nil | binary()) :: :ok - def request_no_response(session, method, headers \\ [], body \\ "", path \\ nil) do - request = %Request{method: method, headers: headers, body: body, path: path} - GenServer.cast(session, {:execute, request}) + _else -> + {:error, :invalid_url} + end end - @spec close(pid()) :: :ok - def close(session), do: GenServer.cast(session, :terminate) - - defp translate({action, new_state}, key, state) do - {action, Map.put(state, key, new_state)} + @spec execute(Request.t(), State.t(), boolean()) :: + :ok | {:ok, binary()} | {:error, reason :: any()} + defp execute(request, state, get_response \\ true) do + %State{ + cseq: cseq, + socket: socket, + uri: uri, + session_id: session_id, + response_timeout: response_timeout + } = state + + request + |> inject_session_header(session_id) + |> inject_content_length() + |> Request.with_header("CSeq", cseq |> to_string()) + |> Request.with_header("User-Agent", @user_agent) + |> apply_credentials(uri, state.auth) + |> Request.stringify(uri) + |> TCPSocket.execute(socket, response_timeout, get_response) end - defp translate({action, reply, new_state}, key, state) do - {action, reply, Map.put(state, key, new_state)} + @spec inject_session_header(Request.t(), binary()) :: Request.t() + defp inject_session_header(request, session_id) do + case session_id do + nil -> request + session -> Request.with_header(request, "Session", session) + end end - @type headers :: [{binary(), binary()}] - - @spec get_transport(t()) :: any() - def get_transport(session) do - GenServer.call(session, :get_transport) + @spec inject_content_length(Request.t()) :: Request.t() + defp inject_content_length(request) do + case request.body do + "" -> request + body -> Request.with_header(request, "Content-Length", to_string(byte_size(body))) + end end - @spec get_parameter_no_response(t(), headers(), binary()) :: :ok - def get_parameter_no_response(session, headers \\ [], body \\ ""), - do: request_no_response(session, "GET_PARAMETER", headers, body) - - @spec play_no_response(t(), headers()) :: :ok - def play_no_response(session, headers \\ []), - do: request_no_response(session, "PLAY", headers, "") + @spec apply_credentials(Request.t(), URI.t(), State.auth()) :: Request.t() + defp apply_credentials(request, %URI{userinfo: nil}, _auth_options), do: request - @spec handle_response(t(), binary()) :: Response.result() - def handle_response(session, raw_response), - do: GenServer.call(session, {:parse_response, raw_response}) + defp apply_credentials(%Request{headers: headers} = request, uri, auth) do + case List.keyfind(headers, "Authorization", 0) do + {"Authorization", _value} -> + request - @spec describe(t(), headers()) :: Response.result() - def describe(session, headers \\ []), do: request(session, "DESCRIBE", headers, "") + _else -> + do_apply_credentials(request, uri, auth) + end + end - @spec announce(t(), headers(), binary()) :: Response.result() - def announce(session, headers \\ [], body \\ ""), - do: request(session, "ANNOUNCE", headers, body) + @spec parse_response(binary(), State.t()) :: + {:ok, Response.t(), State.t()} | {:error, reason :: any()} + defp parse_response(raw_response, state) do + with {:ok, parsed_response} <- Response.parse(raw_response), + {:ok, state} <- handle_session_id(parsed_response, state), + {:ok, state} <- detect_authentication_type(parsed_response, state) do + state = %State{state | cseq: state.cseq + 1} + {:ok, parsed_response, state} + end + end - @spec get_parameter(t(), headers(), binary()) :: Response.result() - def get_parameter(session, headers \\ [], body \\ ""), - do: request(session, "GET_PARAMETER", headers, body) + @spec do_apply_credentials(Request.t(), URI.t(), State.auth()) :: Request.t() + defp do_apply_credentials(request, %URI{userinfo: info}, :basic) do + encoded = Base.encode64(info) + Request.with_header(request, "Authorization", "Basic " <> encoded) + end - @spec options(t(), headers()) :: Response.result() - def options(session, headers \\ []), do: request(session, "OPTIONS", headers) + defp do_apply_credentials(request, %URI{} = uri, {:digest, options}) do + encoded = encode_digest(request, uri, options) + Request.with_header(request, "Authorization", encoded) + end - @spec pause(t(), headers()) :: Response.result() - def pause(session, headers \\ []), do: request(session, "PAUSE", headers) + defp do_apply_credentials(request, _url, _options) do + request + end - @spec play(t(), headers()) :: Response.result() - def play(session, headers \\ []), do: request(session, "PLAY", headers, "") + @spec encode_digest(Request.t(), URI.t(), State.digest_opts()) :: String.t() + defp encode_digest(request, %URI{userinfo: userinfo} = uri, options) do + [username, password] = String.split(userinfo, ":", parts: 2) + encoded_uri = Request.process_uri(request, uri) + ha1 = md5([username, options.realm, password]) + ha2 = md5([request.method, encoded_uri]) + response = md5([ha1, options.nonce, ha2]) + + Enum.join( + [ + "Digest", + ~s(username="#{username}",), + ~s(realm="#{options.realm}",), + ~s(nonce="#{options.nonce}",), + ~s(uri="#{encoded_uri}",), + ~s(response="#{response}") + ], + " " + ) + end - @spec record(t(), headers()) :: Response.result() - def record(session, headers \\ []), do: request(session, "RECORD", headers) + @spec md5([String.t()]) :: String.t() + defp md5(value) do + value + |> Enum.join(":") + |> IO.iodata_to_binary() + |> :erlang.md5() + |> Base.encode16(case: :lower) + end - @spec setup(t(), binary(), headers()) :: Response.result() - def setup(session, path, headers \\ []), do: request(session, "SETUP", headers, "", path) + # Some responses do not have to return the Session ID + # If it does return one, it needs to match one stored in the state. + @spec handle_session_id(Response.t(), State.t()) :: {:ok, State.t()} | {:error, reason :: any()} + defp handle_session_id(%Response{} = response, state) do + with {:ok, session_value} <- Response.get_header(response, "Session") do + [session_id | _rest] = String.split(session_value, ";") + + case state do + %State{session_id: nil} -> {:ok, %State{state | session_id: session_id}} + %State{session_id: ^session_id} -> {:ok, state} + _else -> {:error, :invalid_session_id} + end + else + {:error, :no_such_header} -> {:ok, state} + end + end - @spec set_parameter(t(), headers(), binary()) :: Response.result() - def set_parameter(session, headers \\ [], body \\ ""), - do: request(session, "SET_PARAMETER", headers, body) + # Checks for the `nonce` and `realm` values in the `WWW-Authenticate` header. + # if they exist, sets `type` to `{:digest, opts}` + @spec detect_authentication_type(Response.t(), State.t()) :: {:ok, State.t()} + defp detect_authentication_type(%Response{} = response, state) do + with {:ok, "Digest " <> digest} <- Response.get_header(response, "WWW-Authenticate") do + [_match, nonce] = Regex.run(~r/nonce=\"(?.*)\"/U, digest) + [_match, realm] = Regex.run(~r/realm=\"(?.*)\"/U, digest) + auth_options = {:digest, %{nonce: nonce, realm: realm}} + {:ok, %{state | auth: auth_options}} + else + # non digest auth? + {:ok, _non_digest} -> + {:ok, %{state | auth: :basic}} - @spec teardown(t(), headers()) :: Response.result() - def teardown(session, headers \\ []), do: request(session, "TEARDOWN", headers) + {:error, :no_such_header} -> + {:ok, state} + end + end end diff --git a/lib/membrane_rtsp/server/conn.ex b/lib/membrane_rtsp/server/conn.ex index ec18c2a..6f026c2 100644 --- a/lib/membrane_rtsp/server/conn.ex +++ b/lib/membrane_rtsp/server/conn.ex @@ -1,8 +1,5 @@ defmodule Membrane.RTSP.Server.Conn do - @moduledoc """ - A module representing a client-server connection - """ - + @moduledoc false use GenServer require Logger diff --git a/lib/membrane_rtsp/server/logic.ex b/lib/membrane_rtsp/server/logic.ex index 85aac9a..f903dbf 100644 --- a/lib/membrane_rtsp/server/logic.ex +++ b/lib/membrane_rtsp/server/logic.ex @@ -1,8 +1,5 @@ defmodule Membrane.RTSP.Server.Logic do - @moduledoc """ - Logic for RTSP Server - """ - + @moduledoc false import Mockery.Macro alias Membrane.RTSP.{Request, Response, Server} @@ -11,7 +8,7 @@ defmodule Membrane.RTSP.Server.Logic do @allowed_methods ["GET_PARAMETER", "OPTIONS", "DESCRIBE", "SETUP", "PLAY", "PAUSE", "TEARDOWN"] defmodule State do - @moduledoc "Struct representing the state of a server connection" + @moduledoc false @enforce_keys [:socket, :request_handler] defstruct @enforce_keys ++ [ diff --git a/lib/membrane_rtsp/session/logic.ex b/lib/membrane_rtsp/session/logic.ex deleted file mode 100644 index a4f004b..0000000 --- a/lib/membrane_rtsp/session/logic.ex +++ /dev/null @@ -1,180 +0,0 @@ -defmodule Membrane.RTSP.Logic do - @moduledoc """ - Logic for RTSP session - """ - alias Membrane.RTSP.{Request, Response} - @user_agent "MembraneRTSP/#{Mix.Project.config()[:version]} (Membrane Framework RTSP Client)" - - defmodule State do - @moduledoc "Struct representing the state of RTSP session" - @enforce_keys [:transport, :uri, :transport_module] - defstruct @enforce_keys ++ - [ - :session_id, - cseq: 0, - execution_options: [], - auth: nil - ] - - @type digest_opts() :: %{ - realm: String.t() | nil, - nonce: String.t() | nil - } - - @type auth_t() :: nil | :basic | {:digest, digest_opts()} - - @type t :: %__MODULE__{ - transport: any(), - cseq: non_neg_integer(), - uri: URI.t(), - session_id: binary() | nil, - auth: auth_t(), - execution_options: Keyword.t() - } - end - - @spec user_agent() :: binary() - def user_agent(), do: @user_agent - - @spec execute(Request.t(), State.t(), boolean()) :: - :ok | {:ok, binary()} | {:error, reason :: any()} - def execute(request, state, get_response \\ true) do - %State{ - cseq: cseq, - transport: transport, - transport_module: transport_module, - uri: uri, - session_id: session_id, - execution_options: execution_options - } = state - - request - |> inject_session_header(session_id) - |> inject_content_length() - |> Request.with_header("CSeq", cseq |> to_string()) - |> Request.with_header("User-Agent", @user_agent) - |> apply_credentials(uri, state.auth) - |> Request.stringify(uri) - |> transport_module.execute(transport, execution_options ++ [get_response: get_response]) - end - - @spec inject_session_header(Request.t(), binary()) :: Request.t() - def inject_session_header(request, session_id) do - case session_id do - nil -> request - session -> Request.with_header(request, "Session", session) - end - end - - @spec inject_content_length(Request.t()) :: Request.t() - def inject_content_length(request) do - case request.body do - "" -> request - body -> Request.with_header(request, "Content-Length", to_string(byte_size(body))) - end - end - - @spec apply_credentials(Request.t(), URI.t(), State.auth_t()) :: Request.t() - def apply_credentials(request, %URI{userinfo: nil}, _auth_options), do: request - - def apply_credentials(%Request{headers: headers} = request, uri, auth) do - case List.keyfind(headers, "Authorization", 0) do - {"Authorization", _value} -> - request - - _else -> - do_apply_credentials(request, uri, auth) - end - end - - @spec parse_response(binary(), State.t()) :: - {:ok, Response.t(), State.t()} | {:error, reason :: any()} - def parse_response(raw_response, state) do - with {:ok, parsed_response} <- Response.parse(raw_response), - {:ok, state} <- handle_session_id(parsed_response, state), - {:ok, state} <- detect_authentication_type(parsed_response, state) do - state = %State{state | cseq: state.cseq + 1} - {:ok, parsed_response, state} - end - end - - defp do_apply_credentials(request, %URI{userinfo: info}, :basic) do - encoded = Base.encode64(info) - Request.with_header(request, "Authorization", "Basic " <> encoded) - end - - defp do_apply_credentials(request, %URI{} = uri, {:digest, options}) do - encoded = encode_digest(request, uri, options) - Request.with_header(request, "Authorization", encoded) - end - - defp do_apply_credentials(request, _url, _options) do - request - end - - @spec encode_digest(Request.t(), URI.t(), State.digest_opts()) :: String.t() - def encode_digest(request, %URI{userinfo: userinfo} = uri, options) do - [username, password] = String.split(userinfo, ":", parts: 2) - encoded_uri = Request.process_uri(request, uri) - ha1 = md5([username, options.realm, password]) - ha2 = md5([request.method, encoded_uri]) - response = md5([ha1, options.nonce, ha2]) - - Enum.join( - [ - "Digest", - ~s(username="#{username}",), - ~s(realm="#{options.realm}",), - ~s(nonce="#{options.nonce}",), - ~s(uri="#{encoded_uri}",), - ~s(response="#{response}") - ], - " " - ) - end - - @spec md5([String.t()]) :: String.t() - def md5(value) do - value - |> Enum.join(":") - |> IO.iodata_to_binary() - |> :erlang.md5() - |> Base.encode16(case: :lower) - end - - # Some responses do not have to return the Session ID - # If it does return one, it needs to match one stored in the state. - @spec handle_session_id(Response.t(), State.t()) :: {:ok, State.t()} | {:error, reason :: any()} - def handle_session_id(%Response{} = response, state) do - with {:ok, session_value} <- Response.get_header(response, "Session") do - [session_id | _rest] = String.split(session_value, ";") - - case state do - %State{session_id: nil} -> {:ok, %State{state | session_id: session_id}} - %State{session_id: ^session_id} -> {:ok, state} - _else -> {:error, :invalid_session_id} - end - else - {:error, :no_such_header} -> {:ok, state} - end - end - - # Checks for the `nonce` and `realm` values in the `WWW-Authenticate` header. - # if they exist, sets `type` to `{:digest, opts}` - @spec detect_authentication_type(Response.t(), State.t()) :: {:ok, State.t()} - def detect_authentication_type(%Response{} = response, state) do - with {:ok, "Digest " <> digest} <- Response.get_header(response, "WWW-Authenticate") do - [_match, nonce] = Regex.run(~r/nonce=\"(?.*)\"/U, digest) - [_match, realm] = Regex.run(~r/realm=\"(?.*)\"/U, digest) - auth_options = {:digest, %{nonce: nonce, realm: realm}} - {:ok, %{state | auth: auth_options}} - else - # non digest auth? - {:ok, _non_digest} -> - {:ok, %{state | auth: :basic}} - - {:error, :no_such_header} -> - {:ok, state} - end - end -end diff --git a/lib/membrane_rtsp/tcp_socket.ex b/lib/membrane_rtsp/tcp_socket.ex new file mode 100644 index 0000000..9be649e --- /dev/null +++ b/lib/membrane_rtsp/tcp_socket.ex @@ -0,0 +1,58 @@ +defmodule Membrane.RTSP.TCPSocket do + @moduledoc false + import Mockery.Macro + + @connection_timeout 1000 + @response_timeout 5000 + + @spec connect(URI.t(), non_neg_integer() | nil) :: + {:ok, :gen_tcp.socket()} | {:error, :timeout | :inet.posix()} + def connect(%URI{host: host, port: port}, connection_timeout \\ @connection_timeout) do + mockable(:gen_tcp).connect( + to_charlist(host), + port, + [:binary, active: false], + connection_timeout || @connection_timeout + ) + end + + @spec execute(binary(), :gen_tcp.socket(), non_neg_integer() | nil, boolean()) :: + :ok | {:ok, binary()} | {:error, :closed | :timeout | :inet.posix()} + def execute(request, socket, response_timeout, true = _get_response) do + with :ok <- mockable(:gen_tcp).send(socket, request) do + recv(socket, response_timeout) + end + end + + def execute(request, socket, _response_timeout, false = _get_response) do + mockable(:gen_tcp).send(socket, request) + end + + @spec close(:gen_tcp.socket()) :: :ok + def close(socket) do + :gen_tcp.close(socket) + end + + defp recv(socket, response_timeout, length \\ 0, acc \\ <<>>) do + case do_recv(socket, response_timeout, length, acc) do + {:ok, data} -> + case Membrane.RTSP.Response.verify_content_length(data) do + {:ok, _expected, _received} -> + {:ok, data} + + {:error, expected, received} -> + recv(socket, response_timeout, expected - received, data) + end + + {:error, reason} -> + {:error, reason} + end + end + + defp do_recv(socket, response_timeout, length, acc) do + case mockable(:gen_tcp).recv(socket, length, response_timeout || @response_timeout) do + {:ok, data} -> {:ok, acc <> data} + {:error, reason} -> {:error, reason} + end + end +end diff --git a/lib/membrane_rtsp/transport/tcp_socket.ex b/lib/membrane_rtsp/transport/tcp_socket.ex deleted file mode 100644 index 641578c..0000000 --- a/lib/membrane_rtsp/transport/tcp_socket.ex +++ /dev/null @@ -1,74 +0,0 @@ -defmodule Membrane.RTSP.Transport.TCPSocket do - @moduledoc """ - This module implements the Transport behaviour and transmits requests over TCP - Socket keeping connection until either session is closed or connection is - closed by server. - - Supported options: - * timeout - time after request will be deemed missing and error shall be - returned. - """ - use Membrane.RTSP.Transport - import Mockery.Macro - - @connection_timeout 1000 - @response_timeout 5000 - - @impl true - def init(%URI{} = connection_info, options \\ []) do - connection_timeout = options[:connection_timeout] || @connection_timeout - - with {:ok, socket} <- open(connection_info, connection_timeout) do - {:ok, socket} - else - {:error, reason} -> {:error, reason} - end - end - - defp open(%URI{host: host, port: port}, connection_timeout) do - mockable(:gen_tcp).connect( - to_charlist(host), - port, - [:binary, active: false], - connection_timeout - ) - end - - @impl true - def execute(request, socket, options) do - case mockable(:gen_tcp).send(socket, request) do - :ok -> if options[:get_response], do: recv(socket, options), else: :ok - error -> error - end - end - - @impl true - def handle_info({:tcp_closed, _socket}, state) do - {:stop, :socket_closed, state} - end - - @impl true - def close(_state), do: :ok - - defp recv(socket, options, length \\ 0, acc \\ <<>>) do - case do_recv(socket, options, length, acc) do - {:ok, data} -> - case Membrane.RTSP.Response.verify_content_length(data) do - {:ok, _expected, _received} -> {:ok, data} - {:error, expected, received} -> recv(socket, options, expected - received, data) - end - - {:error, reason} -> - {:error, reason} - end - end - - defp do_recv(socket, options, length, acc) do - timeout = options[:response_timeout] || @response_timeout - - case mockable(:gen_tcp).recv(socket, length, timeout) do - {:ok, data} -> {:ok, acc <> data} - {:error, reason} -> {:error, reason} - end - end -end diff --git a/lib/membrane_rtsp/transport/transport.ex b/lib/membrane_rtsp/transport/transport.ex deleted file mode 100644 index 37c8776..0000000 --- a/lib/membrane_rtsp/transport/transport.ex +++ /dev/null @@ -1,51 +0,0 @@ -defmodule Membrane.RTSP.Transport do - @moduledoc """ - Behaviour describing Transport Layer for Real Time Streaming Protocol - """ - - @doc """ - Callback for initialization of transport layer implementation. - - Upon successful initialization, the callback should return {:ok, state}. - Value of state can be anything, but it is recommended that it contains some information that identifies a transport layer instance. - """ - @callback init(url :: URI.t(), options :: Keyword.t()) :: - {:ok, any()} | {:error, any()} - - @doc """ - Callback for handling any transport-layer specific messages. Session will redirect any unknown messages to this callback. - - It is useful for eg. correctly handling :tcp_close message and similar. - """ - @callback handle_info(msg :: any(), state :: any()) :: - {action :: term(), state :: any()} - | {action :: term(), reply :: any(), state :: any()} - - @doc """ - Callback for executing requests with a given transport layer. - """ - @callback execute(request :: any(), state :: any(), options :: Keyword.t()) :: - :ok | {:ok, reply :: any()} | {:error, reason :: any()} - - @doc """ - Callback used for cleaning up the transport layer when the session is closed. - """ - @callback close(state :: any()) :: :ok - - @optional_callbacks handle_info: 2 - - defmacro __using__(_block) do - quote do - @behaviour Membrane.RTSP.Transport - - @impl Membrane.RTSP.Transport - def handle_info(_msg, _state), do: raise("handle_info/2 has not been implemented") - - defoverridable handle_info: 2 - end - end - - @spec new(module(), binary() | URI.t(), Keyword.t()) :: {:ok, any()} | {:error, any()} - @deprecated "Use Membrane.RTSP.init/3 instead. It is not recommended to manually initiate transport" - def new(module, url, options \\ []), do: module.init(url, options) -end diff --git a/test/membrane_rtsp/session/session_integration_test.exs b/test/membrane_rtsp/session/session_integration_test.exs index 6971ec3..2d152d3 100644 --- a/test/membrane_rtsp/session/session_integration_test.exs +++ b/test/membrane_rtsp/session/session_integration_test.exs @@ -4,19 +4,18 @@ defmodule Membrane.RTSP.IntegrationTest do alias Membrane.RTSP alias Membrane.RTSP.{Request, Response} - alias Membrane.RTSP.Transport.TCPSocket @uri "rtsp://wowzaec2demo.streamlock.net:554/vod/mp4:BigBuckBunny_115k.mov" describe "Session works in combination with" do @tag external: true test "real transport" do - integration_test(@uri, TCPSocket) + integration_test(@uri) end end - defp integration_test(uri, transport, options \\ []) do - {:ok, pid} = RTSP.start_link(uri, transport, options) + defp integration_test(uri, options \\ []) do + {:ok, pid} = RTSP.start_link(uri, options) request = %Request{ method: "DESCRIBE", diff --git a/test/membrane_rtsp/session/session_logic_test.exs b/test/membrane_rtsp/session/session_logic_test.exs index ce2cb70..896470a 100644 --- a/test/membrane_rtsp/session/session_logic_test.exs +++ b/test/membrane_rtsp/session/session_logic_test.exs @@ -2,21 +2,23 @@ defmodule Membrane.RTSP.SessionLogicTest do use ExUnit.Case use Bunch import Mockery - import Mockery.Assertions alias Membrane.RTSP - alias Membrane.RTSP.Logic.State - alias Membrane.RTSP.Request - alias Membrane.RTSP.Transport.Fake + alias Membrane.RTSP.State + alias Membrane.RTSP.{Request, TCPSocket} + + @response_header "RTSP/1.0 200 OK\r\n" setup_all do - {:ok, transport} = Fake.init("fake_executor") + uri = "rtsp://localhost:5554/vod/mp4:name.mov" |> URI.parse() + mock(:gen_tcp, :connect, {:ok, nil}) + {:ok, socket} = TCPSocket.connect(uri, 500) state = %State{ - transport: transport, - transport_module: Fake, + socket: socket, cseq: 0, - uri: "rtsp://domain.net:554/vod/mp4:name.mov" |> URI.parse(), + uri: uri, + response_timeout: 500, session_id: "fake_session" } @@ -31,22 +33,23 @@ defmodule Membrane.RTSP.SessionLogicTest do resolved successfully\ """, %{state: state, request: request} do - mock(Fake, [proxy: 2], fn serialized_request, _options -> + mock(:gen_tcp, [send: 2], fn _socket, serialized_request -> assert String.contains?(serialized_request, "\r\nUser-Agent") + mock_response(serialized_request) end) assert {:reply, {:ok, _response}, next_state} = RTSP.handle_call({:execute, request}, nil, state) assert next_state == %State{state | cseq: state.cseq + 1} - assert_called(Fake, proxy: 2) end test "returns an error if response has different session", %{ state: state } do - resolver = fn _request -> {:error, :timeout} end - state = %State{state | execution_options: [resolver: resolver]} + mock(:gen_tcp, [send: 2], fn _socket, _request -> + {:error, :timeout} + end) {:reply, {:error, :timeout}, ^state} = RTSP.handle_call({:execute, %Request{method: "OPTIONS"}}, nil, state) @@ -57,8 +60,9 @@ defmodule Membrane.RTSP.SessionLogicTest do session_id = "arbitrary_string" request = request |> Request.with_header("Session", session_id) - mock(Fake, [proxy: 2], fn serialized_request, _options -> + mock(:gen_tcp, [send: 2], fn _socket, serialized_request -> assert String.contains?(serialized_request, "\r\nSession: " <> session_id <> "\r\n") + mock_response(serialized_request) end) assert {:reply, {:ok, _response}, state} = RTSP.handle_call({:execute, request}, nil, state) @@ -67,16 +71,15 @@ defmodule Membrane.RTSP.SessionLogicTest do assert {:reply, {:ok, _response}, _state} = RTSP.handle_call({:execute, request}, nil, state) - - assert_called(Fake, proxy: 2) end test "add session_id header to request", %{request: request, state: state} do session_id = "arbitrary_string" state = %State{state | session_id: session_id} - mock(Fake, [proxy: 2], fn serialized_request, _options -> + mock(:gen_tcp, [send: 2], fn _socket, serialized_request -> assert String.contains?(serialized_request, "\r\nSession: " <> session_id <> "\r\n") + mock_response(serialized_request) end) assert {:reply, {:ok, _response}, _state} = @@ -90,74 +93,75 @@ defmodule Membrane.RTSP.SessionLogicTest do credentials = "login:password" encoded_credentials = credentials |> Base.encode64() - mock(Fake, [proxy: 2], fn serialized_request, _ref -> + mock(:gen_tcp, [send: 2], fn _socket, serialized_request -> assert String.contains?( serialized_request, "\r\nAuthorization: Basic #{encoded_credentials}\r\n" ) + + mock_response(serialized_request) end) - parsed_uri = URI.parse("rtsp://#{credentials}@domain.net:554/vod/mp4:name.mov") + parsed_uri = URI.parse("rtsp://#{credentials}@localhost:5554/vod/mp4:name.mov") state = %State{state | uri: parsed_uri, auth: :basic} assert {:reply, {:ok, _response}, _state} = RTSP.handle_call({:execute, request}, nil, state) - - assert_called(Fake, proxy: 2) end test "does not apply credentials to request if they were already present", %{state: state} do request = %Request{method: "OPTIONS", headers: [{"Authorization", "Basic data"}]} - mock(Fake, [proxy: 2], fn serialized_request, _ref -> + mock(:gen_tcp, [send: 2], fn _socket, serialized_request -> assert String.contains?( serialized_request, "\r\nAuthorization: Basic data\r\n" ) + + mock_response(serialized_request) end) - parsed_uri = URI.parse("rtsp://login:password@domain.net:554/vod/mp4:name.mov") + parsed_uri = URI.parse("rtsp://login:password@localhost:5554/vod/mp4:name.mov") state = %State{state | uri: parsed_uri} assert {:reply, {:ok, _response}, _state} = RTSP.handle_call({:execute, request}, nil, state) - - assert_called(Fake, proxy: 2) end end test "add digest information in the state", %{state: state, request: request} do - resolver = fn _request -> + mock(:gen_tcp, [send: 2], fn _socket, _request -> {:ok, "RTSP/1.0 200 OK\r\nWWW-Authenticate: Digest realm=\"realm\", nonce=\"nonce\"\r\n\r\n"} - end - - state = %State{state | execution_options: [resolver: resolver]} + end) assert {:reply, {:ok, _response}, state} = RTSP.handle_call({:execute, request}, nil, state) assert state.auth == {:digest, %{nonce: "nonce", realm: "realm"}} - - assert_called(Fake, proxy: 2) end test "digest auth", %{state: state, request: request} do credentials = "login:password" - mock(Fake, [proxy: 2], fn serialized_request, _ref -> + mock(:gen_tcp, [send: 2], fn _socket, serialized_request -> assert String.contains?( serialized_request, - "\r\nAuthorization: Digest username=\"login\", realm=\"realm\", nonce=\"nonce\", uri=\"rtsp://domain.net:554/vod/mp4:name.mov\", response=\"13afc9c7a879e7fbd27bab70d472c4fc\"\r\n" + "\r\nAuthorization: Digest username=\"login\", realm=\"realm\", nonce=\"nonce\", uri=\"rtsp://localhost:5554/vod/mp4:name.mov\", response=\"0e19b16c4576c70fe6b4bf462f2a76b6\"\r\n" ) + + mock_response(serialized_request) end) - parsed_uri = URI.parse("rtsp://#{credentials}@domain.net:554/vod/mp4:name.mov") + parsed_uri = URI.parse("rtsp://#{credentials}@localhost:5554/vod/mp4:name.mov") digest_auth_options = {:digest, %{nonce: "nonce", realm: "realm"}} state = %State{state | uri: parsed_uri, auth: digest_auth_options} assert {:reply, {:ok, _response}, _state} = RTSP.handle_call({:execute, request}, nil, state) + end - assert_called(Fake, proxy: 2) + defp mock_response(request) do + [_line, rest] = String.split(request, "\r\n", parts: 2) + {:ok, @response_header <> rest} end end diff --git a/test/membrane_rtsp/workflow_integration_test.exs b/test/membrane_rtsp/workflow_integration_test.exs index cccf83e..e3d26fd 100644 --- a/test/membrane_rtsp/workflow_integration_test.exs +++ b/test/membrane_rtsp/workflow_integration_test.exs @@ -3,25 +3,23 @@ defmodule Membrane.RTSP.WorkflowIntegrationTest do alias Membrane.RTSP alias Membrane.RTSP.Response - alias Membrane.RTSP.Transport.{Fake, TCPSocket} describe "RTSP workflow executes" do @tag external: true @tag timeout: 80 * 1000 test "over network" do - workflow( - "rtsp://wowzaec2demo.streamlock.net:554/vod/mp4:BigBuckBunny_115k.mov", - TCPSocket - ) + workflow("rtsp://wowzaec2demo.streamlock.net:554/vod/mp4:BigBuckBunny_115k.mov") end test "without internet" do - workflow("rtsp://domain.net:554/vod/mp4:mobvie.mov", Fake, resolver: &resolver/1) + url = "rtsp://localhost:554/vod/mp4:mobvie.mov" |> URI.parse() + spawn(fn -> mock_server_setup(url) end) + workflow(url) end end - defp workflow(url, transport, options \\ []) do - assert {:ok, session} = RTSP.start_link(url, transport, options) + defp workflow(url, options \\ []) do + assert {:ok, session} = RTSP.start_link(url, options) assert {:ok, %Response{status: 200}} = RTSP.describe(session) assert {:ok, %Response{status: 200}} = @@ -38,64 +36,77 @@ defmodule Membrane.RTSP.WorkflowIntegrationTest do assert {:ok, %Response{status: 200}} = RTSP.teardown(session) end - defp resolver(request) do - {_request, response} = List.keyfind(request_mappings(), request, 0) - response + defp mock_server_setup(%URI{port: port}) do + {:ok, listening_socket} = :gen_tcp.listen(port, active: false, mode: :binary) + {:ok, connected_socket} = :gen_tcp.accept(listening_socket) + mock_server_loop(connected_socket) + end + + defp mock_server_loop(socket) do + case :gen_tcp.recv(socket, 0, :infinity) do + {:ok, data} -> + {_request, response} = + List.keyfind(request_mappings(), data, 0) + + :gen_tcp.send(socket, response) + mock_server_loop(socket) + + {:error, :closed} -> + :ok + end end defp request_mappings do - user_agent = Membrane.RTSP.Logic.user_agent() + user_agent = Membrane.RTSP.user_agent() [ {""" - DESCRIBE rtsp://domain.net:554/vod/mp4:mobvie.mov RTSP/1.0 + DESCRIBE rtsp://localhost:554/vod/mp4:mobvie.mov RTSP/1.0 User-Agent: #{user_agent} CSeq: 0\n """ |> format_rtsp_binary(), - {:ok, - """ - RTSP/1.0 200 OK - CSeq: 0 - Server: Wowza Streaming Engine 4.7.5.01 build21752\r\nCache-Control: no-cache - Expires: Tue, 12 Mar 2019 10:48:38 UTC - Content-Length: 587 - Content-Base: rtsp://domain.net:554/vod/mp4:mobvie.mov/ - Date: Tue, 12 Mar 2019 10:48:38 UTC - Content-Type: application/sdp - Session: 369279037;timeout=60 + """ + RTSP/1.0 200 OK + CSeq: 0 + Server: Wowza Streaming Engine 4.7.5.01 build21752 + Cache-Control: no-cache + Expires: Tue, 12 Mar 2019 10:48:38 UTC + Content-Length: 587 + Content-Base: rtsp://localhost:554/vod/mp4:mobvie.mov/ + Date: Tue, 12 Mar 2019 10:48:38 UTC + Content-Type: application/sdp + Session: 369279037;timeout=60 - v=0 - o=- 369279037 369279037 IN IP4 184.72.239.149\r\ns=BigBuckBunny_115k.mov - c=IN IP4 184.72.239.149 - t=0 0 - a=sdplang:en - a=range:npt=0- 596.48 - a=control:* - m=audio 0 RTP/AVP 96 - a=rtpmap:96 mpeg4-generic/12000/2 - a=fmtp:96 profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3;config=1490 - a=control:trackID=1 - m=video 0 RTP/AVP 97 - a=rtpmap:97 H264/90000 - a=fmtp:97 packetization-mode=1;profile-level-id=42C01E;sprop-parameter-sets=Z0LAHtkDxWhAAAADAEAAAAwDxYuS,aMuMsg== - a=cliprect:0,0,160,240 - a=framesize:97 240-160 - a=framerate:24.0 - a=control:trackID=2 - """}}, - {"SETUP rtsp://domain.net:554/vod/mp4:mobvie.mov/trackID=1 RTSP/1.0\r\nUser-Agent: #{user_agent}\r\nCSeq: 1\r\nSession: 369279037\r\nTransport: RTP/AVP;unicast;client_port=57614-57615\r\n\r\n", - {:ok, - "RTSP/1.0 200 OK\r\nCSeq: 1\r\nServer: Wowza Streaming Engine 4.7.5.01 build21752\r\nCache-Control: no-cache\r\nExpires: Tue, 12 Mar 2019 10:48:38 UTC\r\nTransport: RTP/AVP;unicast;client_port=57614-57615;source=184.72.239.149;server_port=16552-16553;ssrc=63D581FB\r\nDate: Tue, 12 Mar 2019 10:48:38 UTC\r\nSession: 369279037;timeout=60\r\n\r\n"}}, - {"SETUP rtsp://domain.net:554/vod/mp4:mobvie.mov/trackID=2 RTSP/1.0\r\nUser-Agent: #{user_agent}\r\nCSeq: 2\r\nSession: 369279037\r\nTransport: RTP/AVP;unicast;client_port=52614-52615\r\n\r\n", - {:ok, - "RTSP/1.0 200 OK\r\nCSeq: 2\r\nServer: Wowza Streaming Engine 4.7.5.01 build21752\r\nCache-Control: no-cache\r\nExpires: Tue, 12 Mar 2019 10:48:38 UTC\r\nTransport: RTP/AVP;unicast;client_port=52614-52615;source=184.72.239.149;server_port=16582-16583;ssrc=644708C0\r\nDate: Tue, 12 Mar 2019 10:48:38 UTC\r\nSession: 369279037;timeout=60\r\n\r\n"}}, - {"PLAY rtsp://domain.net:554/vod/mp4:mobvie.mov RTSP/1.0\r\nUser-Agent: #{user_agent}\r\nCSeq: 3\r\nSession: 369279037\r\n\r\n", - {:ok, - "RTSP/1.0 200 OK\r\nRTP-Info: url=rtsp://domain.net:554/vod/mp4:mobvie.mov/trackID=1;seq=1;rtptime=0,url=rtsp://domain.net:554/vod/mp4:mobvie.mov/trackID=2;seq=1;rtptime=0\r\nCSeq: 3\r\nServer: Wowza Streaming Engine 4.7.5.01 build21752\r\nCache-Control: no-cache\r\nRange: npt=0.0-\r\nSession: 369279037;timeout=60\r\n\r\n"}}, - {"TEARDOWN rtsp://domain.net:554/vod/mp4:mobvie.mov RTSP/1.0\r\nUser-Agent: #{user_agent}\r\nCSeq: 4\r\nSession: 369279037\r\n\r\n", - {:ok, - "RTSP/1.0 200 OK\r\nCSeq: 4\r\nServer: Wowza Streaming Engine 4.7.5.01 build21752\r\nCache-Control: no-cache\r\nSession: 369279037;timeout=60\r\n\r\n"}} + v=0 + o=- 369279037 369279037 IN IP4 184.72.239.149 + s=BigBuckBunny_115k.mov + c=IN IP4 184.72.239.149 + t=0 0 + a=sdplang:en + a=range:npt=0- 596.48 + a=control:* + m=audio 0 RTP/AVP 96 + a=rtpmap:96 mpeg4-generic/12000/2 + a=fmtp:96 profile-level-id=1;mode=AAC-hbr;sizelength=13;indexlength=3;indexdeltalength=3;config=1490 + a=control:trackID=1 + m=video 0 RTP/AVP 97 + a=rtpmap:97 H264/90000 + a=fmtp:97 packetization-mode=1;profile-level-id=42C01E;sprop-parameter-sets=Z0LAHtkDxWhAAAADAEAAAAwDxYuS,aMuMsg== + a=cliprect:0,0,160,240 + a=framesize:97 240-160 + a=framerate:24.0 + a=control:trackID=2 + """ + |> format_rtsp_binary()}, + {"SETUP rtsp://localhost:554/vod/mp4:mobvie.mov/trackID=1 RTSP/1.0\r\nUser-Agent: #{user_agent}\r\nCSeq: 1\r\nSession: 369279037\r\nTransport: RTP/AVP;unicast;client_port=57614-57615\r\n\r\n", + "RTSP/1.0 200 OK\r\nCSeq: 1\r\nServer: Wowza Streaming Engine 4.7.5.01 build21752\r\nCache-Control: no-cache\r\nExpires: Tue, 12 Mar 2019 10:48:38 UTC\r\nTransport: RTP/AVP;unicast;client_port=57614-57615;source=184.72.239.149;server_port=16552-16553;ssrc=63D581FB\r\nDate: Tue, 12 Mar 2019 10:48:38 UTC\r\nSession: 369279037;timeout=60\r\n\r\n"}, + {"SETUP rtsp://localhost:554/vod/mp4:mobvie.mov/trackID=2 RTSP/1.0\r\nUser-Agent: #{user_agent}\r\nCSeq: 2\r\nSession: 369279037\r\nTransport: RTP/AVP;unicast;client_port=52614-52615\r\n\r\n", + "RTSP/1.0 200 OK\r\nCSeq: 2\r\nServer: Wowza Streaming Engine 4.7.5.01 build21752\r\nCache-Control: no-cache\r\nExpires: Tue, 12 Mar 2019 10:48:38 UTC\r\nTransport: RTP/AVP;unicast;client_port=52614-52615;source=184.72.239.149;server_port=16582-16583;ssrc=644708C0\r\nDate: Tue, 12 Mar 2019 10:48:38 UTC\r\nSession: 369279037;timeout=60\r\n\r\n"}, + {"PLAY rtsp://localhost:554/vod/mp4:mobvie.mov RTSP/1.0\r\nUser-Agent: #{user_agent}\r\nCSeq: 3\r\nSession: 369279037\r\n\r\n", + "RTSP/1.0 200 OK\r\nRTP-Info: url=rtsp://localhost:554/vod/mp4:mobvie.mov/trackID=1;seq=1;rtptime=0,url=rtsp://localhost:554/vod/mp4:mobvie.mov/trackID=2;seq=1;rtptime=0\r\nCSeq: 3\r\nServer: Wowza Streaming Engine 4.7.5.01 build21752\r\nCache-Control: no-cache\r\nRange: npt=0.0-\r\nSession: 369279037;timeout=60\r\n\r\n"}, + {"TEARDOWN rtsp://localhost:554/vod/mp4:mobvie.mov RTSP/1.0\r\nUser-Agent: #{user_agent}\r\nCSeq: 4\r\nSession: 369279037\r\n\r\n", + "RTSP/1.0 200 OK\r\nCSeq: 4\r\nServer: Wowza Streaming Engine 4.7.5.01 build21752\r\nCache-Control: no-cache\r\nSession: 369279037;timeout=60\r\n\r\n"} ] end diff --git a/test/support/fake.ex b/test/support/fake.ex deleted file mode 100644 index 2e35559..0000000 --- a/test/support/fake.ex +++ /dev/null @@ -1,33 +0,0 @@ -defmodule Membrane.RTSP.Transport.Fake do - @moduledoc false - use Membrane.RTSP.Transport - import Mockery.Macro - @response "RTSP/1.0 200 OK\r\n" - - @impl true - def execute(request, ref, options \\ []) do - mockable(__MODULE__).proxy(request, ref) - options = Keyword.merge(ref, options) - resolver = Keyword.get(options, :resolver, &__MODULE__.default_resolver/1) - resolver.(request) - end - - @spec default_resolver(Membrane.RTSP.Request.t()) :: {:ok, binary()} - def default_resolver(request) do - [_line, rest] = String.split(request, "\r\n", parts: 2) - {:ok, @response <> rest} - end - - @impl true - def init(_url, options \\ []) do - {:ok, options} - end - - @impl true - def close(_ref) do - :ok - end - - @spec proxy(Membrane.RTSP.Request.t(), any()) :: nil - def proxy(_request, _ref), do: nil -end