Skip to content

Commit

Permalink
Change receiving (#44)
Browse files Browse the repository at this point in the history
* Don't receive from the socket after transfer

* Retry on 401

* Change handle_response signature

* Remove no_response functions

* Dont change socket mode after transfering control

---------

Co-authored-by: Mateusz Front <[email protected]>
  • Loading branch information
Noarkhh and mat-hek committed Sep 18, 2024
1 parent 4cd4834 commit 4bf32ce
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 81 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ of dependencies in `mix.exs`:
```elixir
def deps do
[
{:membrane_rtsp, "~> 0.9.0"}
{:membrane_rtsp, "~> 0.10.0"}
]
end
```
Expand Down
123 changes: 55 additions & 68 deletions lib/membrane_rtsp/rtsp.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ defmodule Membrane.RTSP do

require Logger
alias Membrane.RTSP
alias Membrane.RTSP.{Request, Response, TCPSocket}
alias Membrane.RTSP.{Request, Response, Transport}

@type t() :: pid()

Expand All @@ -19,15 +19,6 @@ defmodule Membrane.RTSP do

defmodule State do
@moduledoc false
@enforce_keys [:socket, :uri]
defstruct @enforce_keys ++
[
:response_timeout,
:session_id,
cseq: 0,
auth: nil
]

@type digest_opts() :: %{
realm: String.t() | nil,
nonce: String.t() | nil
Expand All @@ -41,8 +32,20 @@ defmodule Membrane.RTSP do
uri: URI.t(),
session_id: binary() | nil,
auth: auth(),
response_timeout: non_neg_integer()
response_timeout: non_neg_integer() | nil,
receive_from: :socket | :external_process,
retries: non_neg_integer()
}

@enforce_keys [:socket, :uri, :response_timeout]
defstruct @enforce_keys ++
[
session_id: nil,
cseq: 0,
auth: nil,
receive_from: :socket,
retries: 0
]
end

@doc """
Expand All @@ -67,15 +70,14 @@ defmodule Membrane.RTSP do
GenServer.call(session, {:execute, request}, :infinity)
end

@spec request_no_response(pid(), binary(), RTSP.headers(), binary(), nil | binary()) :: :ok
def request_no_response(session, method, headers \\ [], body \\ "", path \\ nil) do
request = %Request{method: method, headers: headers, body: body, path: path}
GenServer.cast(session, {:execute, request})
end

@spec close(pid()) :: :ok
def close(session), do: GenServer.cast(session, :terminate)

@doc """
Transfer the control of the TCP socket the session was using to a new process. For more information see `:gen_tcp.controlling_process/2`.
From now on the session won't try to receive responses to requests from the socket, since now an other process is controlling it.
Instead of this, the session will synchronously wait for the response to be supplied with `handle_response/2`.
"""
@spec transfer_socket_control(t(), pid()) ::
:ok | {:error, :closed | :not_owner | :badarg | :inet.posix()}
def transfer_socket_control(session, new_controlling_process) do
Expand All @@ -87,19 +89,13 @@ defmodule Membrane.RTSP do
GenServer.call(session, :get_socket)
end

@type headers :: [{binary(), binary()}]

@spec handle_response(t(), binary()) :: Response.result()
def handle_response(session, raw_response),
do: GenServer.call(session, {:parse_response, raw_response})

@spec get_parameter_no_response(t(), headers(), binary()) :: :ok
def get_parameter_no_response(session, headers \\ [], body \\ ""),
do: request_no_response(session, "GET_PARAMETER", headers, body)
@spec handle_response(t(), binary()) :: :ok
def handle_response(session, raw_response) do
send(session, {:raw_response, raw_response})
:ok
end

@spec play_no_response(t(), headers()) :: :ok
def play_no_response(session, headers \\ []),
do: request_no_response(session, "PLAY", headers, "")
@type headers :: [{binary(), binary()}]

@spec describe(t(), headers()) :: Response.result()
def describe(session, headers \\ []), do: request(session, "DESCRIBE", headers, "")
Expand Down Expand Up @@ -148,7 +144,7 @@ defmodule Membrane.RTSP do
%URI{userinfo: info} when is_binary(info) -> :basic
end

with {:ok, socket} <- TCPSocket.connect(url, options[:connection_timeout]) do
with {:ok, socket} <- Transport.connect(url, options[:connection_timeout]) do
state = %State{
socket: socket,
uri: url,
Expand All @@ -164,53 +160,28 @@ defmodule Membrane.RTSP do

@impl true
def handle_call({:execute, request}, _from, state) do
with {:ok, raw_response} <- execute(request, state),
{:ok, response, state} <- parse_response(raw_response, state) do
{:reply, {:ok, response}, state}
else
{:error, reason} -> {:reply, {:error, reason}, state}
end
handle_execute_call(request, true, state)
end

@impl true
def handle_call(
{:transfer_socket_control, new_controlling_process},
_from,
%State{socket: socket} = state
) do
{:reply, :gen_tcp.controlling_process(socket, new_controlling_process), state}
def handle_call({:transfer_socket_control, new_controlling_process}, _from, state) do
{
:reply,
:gen_tcp.controlling_process(state.socket, new_controlling_process),
%{state | receive_from: :external_process}
}
end

@impl true
def handle_call(:get_socket, _from, %State{socket: socket} = state) do
{:reply, socket, state}
end

@impl true
def handle_call({:parse_response, raw_response}, _from, state) do
with {:ok, response, state} <- parse_response(raw_response, state) do
{:reply, {:ok, response}, state}
else
{:error, reason} -> {:reply, {:error, reason}, state}
end
end

@impl true
def handle_cast(:terminate, %State{} = state) do
{:stop, :normal, state}
end

@impl true
def handle_cast({:execute, request}, %State{cseq: cseq} = state) do
case execute(request, state, false) do
:ok ->
{:noreply, %State{state | cseq: cseq + 1}}

{:error, reason} ->
raise "Error executing request #{inspect(request)}, reason: #{inspect(reason)}"
end
end

@impl true
def handle_info({:tcp, _socket, data}, state) do
Logger.warning("Received an unexpected packet, ignoring: #{inspect(data)}")
Expand All @@ -224,7 +195,7 @@ defmodule Membrane.RTSP do

@impl true
def terminate(_reason, state) do
TCPSocket.close(state.socket)
Transport.close(state.socket)
end

@spec do_start(binary() | URI.t(), options(), (module(), any() -> GenServer.on_start())) ::
Expand All @@ -242,9 +213,8 @@ defmodule Membrane.RTSP do
end
end

@spec execute(Request.t(), State.t(), boolean()) ::
:ok | {:ok, binary()} | {:error, reason :: any()}
defp execute(request, state, get_response \\ true) do
@spec execute(Request.t(), State.t()) :: {:ok, binary()} | {:error, reason :: any()}
defp execute(request, state) do
%State{
cseq: cseq,
socket: socket,
Expand All @@ -260,10 +230,10 @@ defmodule Membrane.RTSP do
|> Request.with_header("User-Agent", @user_agent)
|> apply_credentials(uri, state.auth)
|> Request.stringify(uri)
|> TCPSocket.execute(socket, response_timeout, get_response)
|> Transport.execute(socket, response_timeout, state.receive_from)
end

@spec inject_session_header(Request.t(), binary()) :: Request.t()
@spec inject_session_header(Request.t(), binary() | nil) :: Request.t()
defp inject_session_header(request, session_id) do
case session_id do
nil -> request
Expand Down Expand Up @@ -303,6 +273,23 @@ defmodule Membrane.RTSP do
end
end

@spec handle_execute_call(Request.t(), boolean(), State.t()) ::
{:reply, Response.result(), State.t()}
defp handle_execute_call(request, retry, state) do
with {:ok, raw_response} <- execute(request, state),
{:ok, response, state} <- parse_response(raw_response, state) do
case response do
%Response{status: 401} when retry ->
handle_execute_call(request, false, state)

response ->
{:reply, {:ok, response}, state}
end
else
{:error, reason} -> {:reply, {:error, reason}, state}
end
end

@spec do_apply_credentials(Request.t(), URI.t(), State.auth()) :: Request.t()
defp do_apply_credentials(request, %URI{userinfo: info}, :basic) do
encoded = Base.encode64(info)
Expand Down
28 changes: 20 additions & 8 deletions lib/membrane_rtsp/tcp_socket.ex → lib/membrane_rtsp/transport.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
defmodule Membrane.RTSP.TCPSocket do
defmodule Membrane.RTSP.Transport do
@moduledoc false
import Mockery.Macro

alias Membrane.RTSP.Response

@connection_timeout 1000
@response_timeout 5000

Expand All @@ -16,9 +18,9 @@ defmodule Membrane.RTSP.TCPSocket do
)
end

@spec execute(binary(), :gen_tcp.socket(), non_neg_integer() | nil, boolean()) ::
:ok | {:ok, binary()} | {:error, :closed | :timeout | :inet.posix()}
def execute(request, socket, response_timeout, true = _get_response) do
@spec execute(binary(), :gen_tcp.socket(), non_neg_integer() | nil, :socket | :external_process) ::
{:ok, binary()} | {:error, :closed | :timeout | :inet.posix()}
def execute(request, socket, response_timeout, :socket) do
:inet.setopts(socket, active: false)

result =
Expand All @@ -30,8 +32,14 @@ defmodule Membrane.RTSP.TCPSocket do
result
end

def execute(request, socket, _response_timeout, false = _get_response) do
mockable(:gen_tcp).send(socket, request)
def execute(request, socket, response_timeout, :external_process) do
with :ok <- mockable(:gen_tcp).send(socket, request) do
receive do
{:raw_response, response} -> {:ok, response}
after
response_timeout || @response_timeout -> {:error, :timeout}
end
end
end

@spec close(:gen_tcp.socket()) :: :ok
Expand All @@ -42,7 +50,7 @@ defmodule Membrane.RTSP.TCPSocket do
defp recv(socket, response_timeout, length \\ 0, acc \\ <<>>) do
case do_recv(socket, response_timeout, length, acc) do
{:ok, data} ->
case Membrane.RTSP.Response.verify_content_length(data) do
case Response.verify_content_length(data) do
{:ok, _expected, _received} ->
{:ok, data}

Expand All @@ -56,7 +64,11 @@ defmodule Membrane.RTSP.TCPSocket do
end

defp do_recv(socket, response_timeout, length, acc) do
case mockable(:gen_tcp).recv(socket, length, response_timeout || @response_timeout) do
case mockable(:gen_tcp).recv(
socket,
length,
response_timeout || @response_timeout
) do
{:ok, data} -> {:ok, acc <> data}
{:error, reason} -> {:error, reason}
end
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule Membrane.RTSP.MixProject do
use Mix.Project

@version "0.9.1"
@version "0.10.0"
@github_url "https://github.com/membraneframework/membrane_rtsp"

def project do
Expand Down
2 changes: 1 addition & 1 deletion mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm", "f7eba2ea6c3555cea09706492716b0d87397b88946e6380898c2889d68585752"},
"erlex": {:hex, :erlex, "0.2.7", "810e8725f96ab74d17aac676e748627a07bc87eb950d2b83acd29dc047a30595", [:mix], [], "hexpm", "3ed95f79d1a844c3f6bf0cea61e0d5612a42ce56da9c03f01df538685365efb0"},
"ex_doc": {:hex, :ex_doc, "0.34.2", "13eedf3844ccdce25cfd837b99bea9ad92c4e511233199440488d217c92571e8", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "5ce5f16b41208a50106afed3de6a2ed34f4acfd65715b82a0b84b49d995f95c1"},
"ex_sdp": {:hex, :ex_sdp, "0.17.0", "4c50e7814f01f149c0ccf258fba8428f8567dffecf1c416ec3f6aaaac607a161", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:elixir_uuid, "~> 1.2", [hex: :elixir_uuid, repo: "hexpm", optional: false]}], "hexpm", "c7fe0625902be2a835b5fe6834a189f7db7639d2625c8e9d8b3564e6d704145f"},
"ex_sdp": {:hex, :ex_sdp, "1.0.0", "c66cd66d60ad03ff1eecdc6db6a1b8a7b89fec260fcc22e8d6703fc5bbf430a3", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:elixir_uuid, "~> 1.2", [hex: :elixir_uuid, repo: "hexpm", optional: false]}], "hexpm", "e165dff988b8ab9d93588636aa5f3f683e1f848fc63b78b12382c8fa3dd39216"},
"file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"},
"jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"},
"makeup": {:hex, :makeup, "1.1.2", "9ba8837913bdf757787e71c1581c21f9d2455f4dd04cfca785c70bbfff1a76a3", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cce1566b81fbcbd21eca8ffe808f33b221f9eee2cbc7a1706fc3da9ff18e6cac"},
Expand Down
4 changes: 2 additions & 2 deletions test/membrane_rtsp/session/session_logic_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ defmodule Membrane.RTSP.SessionLogicTest do

alias Membrane.RTSP
alias Membrane.RTSP.State
alias Membrane.RTSP.{Request, TCPSocket}
alias Membrane.RTSP.{Request, Transport}

@response_header "RTSP/1.0 200 OK\r\n"

setup_all do
uri = "rtsp://localhost:5554/vod/mp4:name.mov" |> URI.parse()
mock(:gen_tcp, :connect, :gen_tcp.listen(0, []))
{:ok, socket} = TCPSocket.connect(uri, 500)
{:ok, socket} = Transport.connect(uri, 500)

state = %State{
socket: socket,
Expand Down

0 comments on commit 4bf32ce

Please sign in to comment.