Skip to content

Commit

Permalink
Merge pull request #298 from leondavi/results_opt
Browse files Browse the repository at this point in the history
[ApiServer] Optimize communication of model phase results
  • Loading branch information
leondavi committed Mar 28, 2024
2 parents 07ee1ef + 8aa5858 commit a2cfe5f
Show file tree
Hide file tree
Showing 13 changed files with 107 additions and 120 deletions.
3 changes: 2 additions & 1 deletion src_cpp/opennnBridge/openNNnif.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ void* PredictFun(void* arg)
// Stop the timer and calculate the time took for training
high_resolution_clock::time_point stop = high_resolution_clock::now();
auto duration = duration_cast<microseconds>(stop - PredictNNptr->start_time);
nifpp::TERM predict_time = nifpp::make(env, duration.count());

ERL_NIF_TERM predict_time = enif_make_double(env, duration.count());
nifpp::str_atom nerlnif_atom_str(NERLNIF_ATOM_STR);
nifpp::TERM nerlnif_atom = nifpp::make(env , nerlnif_atom_str);
ERL_NIF_TERM predict_res_and_time = enif_make_tuple(env, 4 , nerlnif_atom , prediction , nifpp::make(env, PredictNNptr->return_tensor_type) , predict_time);
Expand Down
7 changes: 3 additions & 4 deletions src_erl/NerlnetApp/src/Bridge/nerlNIF.erl
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ call_to_train(ModelID, {DataTensor, Type}, WorkerPid , BatchID , SourceName)->
{nerlnif, nan, TrainTime} ->
gen_statem:cast(WorkerPid,{loss, nan , TrainTime , BatchID , SourceName}); %TODO Guy - Please the behavior when this case happens
{nerlnif , LossTensor, LossTensorType , TrainTime}->
{ErlTensor, ErlTensorType} = nerltensor_conversion({LossTensor, LossTensorType}, erl_float), % TODO Guy - Please do the conversion in main server
gen_statem:cast(WorkerPid,{loss, {ErlTensor, ErlTensorType} , TrainTime , BatchID , SourceName})
gen_statem:cast(WorkerPid,{loss, {LossTensor, LossTensorType} , TrainTime , BatchID , SourceName})
after ?TRAIN_TIMEOUT -> %TODO inspect this timeout
?LOG_ERROR("Worker train timeout reached! bid:~p s:~p",[BatchID , SourceName]),
gen_statem:cast(WorkerPid,{loss, timeout , SourceName}) %% TODO Guy Define train timeout state
Expand All @@ -64,11 +63,11 @@ call_to_predict(ModelID, {BatchTensor, Type}, WorkerPid, BatchID , SourceName)->
ok = predict_nif(ModelID, BatchTensor, Type),
receive

{nerlnif , PredNerlTensor, NewType, TimeTook}-> %% nerlnif atom means a message from the nif implementation
{nerlnif , PredNerlTensor, PredNerlTensorType, TimeNif}-> %% nerlnif atom means a message from the nif implementation
% io:format("pred_nif done~n"),
% {PredTen, _NewType} = nerltensor_conversion({PredNerlTensor, NewType}, erl_float),
% io:format("Pred returned: ~p~n", [PredNerlTensor]),
gen_statem:cast(WorkerPid,{predictRes,PredNerlTensor, NewType, TimeTook, BatchID , SourceName});
gen_statem:cast(WorkerPid,{predictRes,PredNerlTensor, PredNerlTensorType, TimeNif, BatchID , SourceName});
Error ->
?LOG_ERROR("received wrong prediction_nif format: ~p" ,[Error]),
throw("received wrong prediction_nif format")
Expand Down
15 changes: 7 additions & 8 deletions src_erl/NerlnetApp/src/Bridge/onnWorkers/workerGeneric.erl
Original file line number Diff line number Diff line change
Expand Up @@ -161,24 +161,23 @@ idle(cast, _Param, State) ->

%% Waiting for receiving results or loss function
%% Got nan or inf from loss function - Error, loss function too big for double
wait(cast, {loss , nan , TimeNIF , BatchID , SourceName}, State = #workerGeneric_state{myName = MyName, nextState = NextState}) ->
wait(cast, {loss, nan , TrainTime , BatchID , SourceName}, State = #workerGeneric_state{myName = MyName, nextState = NextState}) ->
stats:increment_by_value(get(worker_stats_ets), nan_loss_count, 1),
gen_statem:cast(get(client_pid),{loss, MyName , SourceName ,nan , TimeNIF ,BatchID}),
gen_statem:cast(get(client_pid),{loss, MyName , SourceName ,nan , TrainTime ,BatchID}),
{next_state, NextState, State};

wait(cast, {loss, LossTensor , TimeNIF , BatchID , SourceName}, State = #workerGeneric_state{myName = MyName, nextState = NextState, modelID=_ModelID, distributedBehaviorFunc = DistributedBehaviorFunc, distributedWorkerData = DistributedWorkerData}) ->
% {[_ , _ , _ , LossValue] , _} = LossTensor,
% io:format("Got Loss Value ~p~n",[LossValue]),

wait(cast, {loss, {LossTensor, LossTensorType} , TrainTime , BatchID , SourceName}, State = #workerGeneric_state{myName = MyName, nextState = NextState, modelID=_ModelID, distributedBehaviorFunc = DistributedBehaviorFunc, distributedWorkerData = DistributedWorkerData}) ->
BatchTimeStamp = erlang:system_time(nanosecond),
gen_statem:cast(get(client_pid),{loss, MyName, SourceName ,LossTensor , TimeNIF , BatchID , BatchTimeStamp}), %% TODO Add Time and Time_NIF to the cast
gen_statem:cast(get(client_pid),{loss, MyName, SourceName ,{LossTensor, LossTensorType} , TrainTime , BatchID , BatchTimeStamp}),
ToUpdate = DistributedBehaviorFunc(post_train, {get(generic_worker_ets),DistributedWorkerData}),
if ToUpdate -> {next_state, update, State#workerGeneric_state{nextState=NextState}};
true -> {next_state, NextState, State}
end;

wait(cast, {predictRes,PredNerlTensor, Type, TimeNIF, BatchID , SourceName}, State = #workerGeneric_state{myName = MyName, nextState = NextState, distributedBehaviorFunc = DistributedBehaviorFunc, distributedWorkerData = DistributedWorkerData}) ->
wait(cast, {predictRes, PredNerlTensor, PredNerlTensorType, TimeNif, BatchID , SourceName}, State = #workerGeneric_state{myName = MyName, nextState = NextState, distributedBehaviorFunc = DistributedBehaviorFunc, distributedWorkerData = DistributedWorkerData}) ->
BatchTimeStamp = erlang:system_time(nanosecond),
gen_statem:cast(get(client_pid),{predictRes,MyName,SourceName, {PredNerlTensor, Type}, TimeNIF , BatchID , BatchTimeStamp}),
gen_statem:cast(get(client_pid),{predictRes,MyName, SourceName, {PredNerlTensor, PredNerlTensorType}, TimeNif , BatchID , BatchTimeStamp}),
Update = DistributedBehaviorFunc(post_predict, {get(generic_worker_ets),DistributedWorkerData}),
if Update ->
{next_state, update, State#workerGeneric_state{nextState=NextState}};
Expand Down
9 changes: 5 additions & 4 deletions src_erl/NerlnetApp/src/Client/clientStatem.erl
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ training(cast, _In = {predict}, State = #client_statem_state{myName = MyName, et
?LOG_ERROR("Wrong request , client ~p can't go from training to predict directly", [MyName]),
{next_state, training, State#client_statem_state{etsRef = EtsRef}};

training(cast, In = {loss , WorkerName , SourceName , LossTensor , TimeNIF , BatchID , BatchTS}, State = #client_statem_state{myName = MyName,etsRef = EtsRef}) ->
training(cast, In = {loss, WorkerName ,SourceName ,LossTensor ,TimeNIF ,BatchID ,BatchTS}, State = #client_statem_state{myName = MyName,etsRef = EtsRef}) ->
ClientStatsEts = get(client_stats_ets),
stats:increment_messages_received(ClientStatsEts),
stats:increment_bytes_received(ClientStatsEts , nerl_tools:calculate_size(In)),
Expand Down Expand Up @@ -315,14 +315,15 @@ predict(cast, In = {sample,Body}, State = #client_statem_state{etsRef = EtsRef})
end,
{next_state, predict, State#client_statem_state{etsRef = EtsRef}};

predict(cast, In = {predictRes,WorkerName, SourceName ,{PredictNerlTensor, Type} , TimeTook , BatchID , BatchTS}, State = #client_statem_state{myName = _MyName, etsRef = EtsRef}) ->
predict(cast, In = {predictRes,WorkerName, SourceName ,{PredictNerlTensor, NetlTensorType} , TimeTook , BatchID , BatchTS}, State = #client_statem_state{myName = _MyName, etsRef = EtsRef}) ->
ClientStatsEts = get(client_stats_ets),
stats:increment_messages_received(ClientStatsEts),
stats:increment_bytes_received(ClientStatsEts , nerl_tools:calculate_size(In)),

{RouterHost,RouterPort} = ets:lookup_element(EtsRef, my_router, ?DATA_IDX),
MessageBody = {atom_to_list(WorkerName), SourceName, BatchID, {PredictNerlTensor , Type} , TimeTook , BatchTS}, %% SHOULD INCLUDE TYPE?
MessageBody = {WorkerName, SourceName, {PredictNerlTensor , NetlTensorType}, TimeTook, BatchID, BatchTS}, %% SHOULD INCLUDE TYPE?
nerl_tools:http_router_request(RouterHost, RouterPort, [?MAIN_SERVER_ATOM], atom_to_list(predictRes), MessageBody),

stats:increment_messages_sent(ClientStatsEts),
stats:increment_bytes_sent(ClientStatsEts , nerl_tools:calculate_size(MessageBody)),
{next_state, predict, State#client_statem_state{etsRef = EtsRef}};
Expand Down
50 changes: 20 additions & 30 deletions src_erl/NerlnetApp/src/MainServer/mainGenserver.erl
Original file line number Diff line number Diff line change
Expand Up @@ -262,16 +262,16 @@ handle_cast({clientAck,Body}, State = #main_genserver_state{clientsWaitingList =
ClientName = binary_to_term(Body),
NewWaitingList = WaitingList--[ClientName], % waitingList is initialized in clientsTraining or clientsPredict handl cast calls
if length(NewWaitingList) == 0 ->
ResultsToSendStr = generate_phase_result_data_to_send_from_ets_as_str(),
NothingToSend = string:is_empty(ResultsToSendStr),
PhaseResultsDataMap = generate_phase_result_data_map(),
NothingToSend = string:is_empty(PhaseResultsDataMap),
if
NothingToSend -> pass;
true -> Action = case get(active_phase) of
training -> trainRes;
prediction -> predRes
end,
{RouterHost,RouterPort} = ets:lookup_element(get(main_server_ets), my_router, ?DATA_IDX), % get main_server's router
nerl_tools:http_router_request(RouterHost, RouterPort, [?API_SERVER_ATOM], atom_to_list(Action), ResultsToSendStr),
nerl_tools:http_router_request(RouterHost, RouterPort, [?API_SERVER_ATOM], atom_to_list(Action), {json, PhaseResultsDataMap}),
stats:increment_messages_sent(StatsEts),
clean_phase_result_data_to_send_ets() % getting ready for next phase after data was sent to APIServer
end,
Expand Down Expand Up @@ -324,11 +324,12 @@ handle_cast({lossFunction,Body}, State = #main_genserver_state{myName = MyName})
stats:increment_messages_received(StatsEts),
try
case binary_to_term(Body) of
{WorkerName , SourceName , {LossTensor , _Type} , TimeNIF , BatchID , BatchTS} ->
ToSend = ?PHASE_RES_DATA_SEPARATOR ++ atom_to_list(WorkerName) ++ ?PHASE_RES_WORKER_NAME_SEPERATOR ++ atom_to_list(SourceName) ++
?PHASE_RES_VALUES_SEPERATOR ++ nerl_tools:string_format("~p",[LossTensor]) ++ ?PHASE_RES_VALUES_SEPERATOR ++ float_to_list(TimeNIF) ++
?PHASE_RES_VALUES_SEPERATOR ++ integer_to_list(BatchID) ++ ?PHASE_RES_VALUES_SEPERATOR ++ integer_to_list(BatchTS) ++ ?PHASE_RES_DATA_SEPARATOR,
store_phase_result_data_to_send_ets({WorkerName, BatchID , BatchTS}, ToSend);
{WorkerName , SourceName , {LossNerlTensor , LossNerlTensorType} , TimeNIF , BatchID , BatchTS} ->
Key = atom_to_list(WorkerName) ++ ?PHASE_RES_VALUES_IN_KEY_SEPARATOR ++ atom_to_list(SourceName) ++
?PHASE_RES_VALUES_IN_KEY_SEPARATOR ++ integer_to_list(BatchID) ++ ?PHASE_RES_VALUES_IN_KEY_SEPARATOR ++
integer_to_list(BatchTS) ++ ?PHASE_RES_VALUES_IN_KEY_SEPARATOR ++ float_to_list(TimeNIF) ++ ?PHASE_RES_VALUES_IN_KEY_SEPARATOR ++
atom_to_list(LossNerlTensorType),
store_phase_result_data_to_send_ets(Key, binary_to_list(LossNerlTensor));
_ELSE ->
?LOG_ERROR("~p Wrong loss function pattern received from client and its worker ~p", [MyName, Body])
end
Expand All @@ -344,18 +345,12 @@ handle_cast({predictRes,Body}, State) ->
_BatchSize = ets:lookup_element(get(main_server_ets), batch_size, ?DATA_IDX),
stats:increment_messages_received(StatsEts),
try
{WorkerName, SourceName, BatchID, {NerlTensor, Type}, TimeNIF , BatchTS} = binary_to_term(Body), %% TODO: add convention with client
%io:format("WorkerName: ~p, InputName: ~p, BatchID: ~p, Type: ~p~n",[WorkerName, InputName, BatchID, Type]),
{DecodedNerlTensor, _Type} =
if
(NerlTensor==<<>>) -> ?LOG_ERROR(?LOG_HEADER++"Got empty tensor"), empty_nerltensor_err;
true -> nerlNIF:nerltensor_conversion({NerlTensor, Type}, nerlNIF:erl_type_conversion(Type)) % converting nerltensor from binary to erlang type using NerlNIF
end,
ToSend = ?PHASE_RES_DATA_SEPARATOR ++ WorkerName ++ ?PHASE_RES_WORKER_NAME_SEPERATOR ++ atom_to_list(SourceName) ++
?PHASE_RES_VALUES_SEPERATOR ++ nerl_tools:string_format("~p",[DecodedNerlTensor]) ++ ?PHASE_RES_VALUES_SEPERATOR ++
integer_to_list(TimeNIF) ++ ?PHASE_RES_VALUES_SEPERATOR ++ integer_to_list(BatchID) ++ ?PHASE_RES_VALUES_SEPERATOR ++
integer_to_list(BatchTS) ++ ?PHASE_RES_DATA_SEPARATOR,
store_phase_result_data_to_send_ets({WorkerName, BatchID , BatchTS}, ToSend)
{WorkerName, SourceName, {NerlTensor, NerlTensorType}, TimeNIF , BatchID, BatchTS} = binary_to_term(Body),
Key = atom_to_list(WorkerName) ++ ?PHASE_RES_VALUES_IN_KEY_SEPARATOR ++ atom_to_list(SourceName) ++
?PHASE_RES_VALUES_IN_KEY_SEPARATOR ++ integer_to_list(BatchID) ++ ?PHASE_RES_VALUES_IN_KEY_SEPARATOR ++
integer_to_list(BatchTS) ++ ?PHASE_RES_VALUES_IN_KEY_SEPARATOR ++ float_to_list(TimeNIF) ++ ?PHASE_RES_VALUES_IN_KEY_SEPARATOR ++
atom_to_list(NerlTensorType),
store_phase_result_data_to_send_ets(Key, binary_to_list(NerlTensor))
catch Err:E ->
?LOG_ERROR(?LOG_HEADER++"Error receiving predict result ~p",[{Err,E}])
end,
Expand Down Expand Up @@ -471,19 +466,14 @@ retransmission_to_apiserver(HttpRouterRequestFunc, Trials) ->
end.


store_phase_result_data_to_send_ets({WorkerName, BatchID , BatchTS}, DataToSendStr) ->
Key = {WorkerName, BatchID , BatchTS},
ets:insert(get(phase_res_data_ets),{Key, DataToSendStr}).

store_phase_result_data_to_send_ets(Key, NerlTensorData) ->
KeyBin = list_to_binary(Key),
ets:insert(get(phase_res_data_ets),{KeyBin, NerlTensorData}).

generate_phase_result_data_string_from_list([], _ResString) -> _ResString;
generate_phase_result_data_string_from_list(ListOfData, ResString) ->
NewResString = ResString++element(?DATA_IDX,hd(ListOfData)),
generate_phase_result_data_string_from_list(tl(ListOfData), NewResString).

generate_phase_result_data_to_send_from_ets_as_str() ->
generate_phase_result_data_map() ->
ListOfData = ets:tab2list(get(phase_res_data_ets)),
generate_phase_result_data_string_from_list(ListOfData, ""). % String to send is retruned
ListOfData.

clean_phase_result_data_to_send_ets() ->
ets:delete_all_objects(get(phase_res_data_ets)).
1 change: 1 addition & 0 deletions src_erl/NerlnetApp/src/MainServer/mainServerDefs.hrl
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
-define(API_SERVER_ACTION_ACK, "ackPy").

-define(PHASE_RES_VALUES_IN_KEY_SEPARATOR, "#").
-define(PHASE_RES_WORKER_NAME_SEPERATOR, "#").
-define(PHASE_RES_VALUES_SEPERATOR, "|").
-define(PHASE_RES_DATA_SEPARATOR, "?").
2 changes: 1 addition & 1 deletion src_erl/NerlnetApp/src/Source/sourceStatem.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@


%% defintions
-define(SENDING_FREQUENCY_OVERHEAD_FIX_FACTOR_PERC, 0.85).
-define(SENDING_FREQUENCY_OVERHEAD_FIX_FACTOR_PERC, 0.75).
-define(MICRO_TO_MILLI_FACTOR, 0.001).


Expand Down
18 changes: 13 additions & 5 deletions src_erl/NerlnetApp/src/nerl_tools.erl
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,22 @@ http_router_request(RouterHost, RouterPort, DestinationsList, ActionStr, Body) -
end.


http_request(Host, Port, Path, {json, Body}) ->
io:format("Sending Json to ~p:~p~n",[Host,Port]),
JsonContentType = ?HTTP_CONTENT_TYPE_JSON,
Json = jsx:encode(Body),
http_request(Host, Port,Path, JsonContentType, Json);
http_request(Host, Port,Path, Body) ->
DefaultContentType = ?HTTP_CONTENT_TYPE_FORM_URLENCODED,
http_request(Host, Port,Path, DefaultContentType, Body).

%% send message between entities
http_request(Host, Port,Path, Body) when is_atom(Body) -> http_request(Host, Port,Path, atom_to_list(Body));
http_request(Host, Port,Path, Body) when is_binary(Host) -> http_request(binary_to_list(Host), Port,Path, Body);
http_request(Host, Port,Path, Body)->
URL = "http://" ++ Host ++ ":"++integer_to_list(Port) ++ "/" ++ Path,
http_request(Host, Port, Path, ContentType, Body) when is_atom(Body) -> http_request(Host, Port,Path, ContentType, atom_to_list(Body));
http_request(Host, Port, Path, ContentType, Body) when is_binary(Host) -> http_request(binary_to_list(Host), Port,Path, ContentType, Body);
http_request(Host, Port, Path, ContentType, Body)->
URL = "http://" ++ Host ++ ":"++integer_to_list(Port) ++ "/" ++ Path, % Path is the action
httpc:set_options([{proxy, {{Host, Port},[Host]}}]),
httpc:request(post,{URL, [],"application/x-www-form-urlencoded",Body}, [], []).
httpc:request(post,{URL, [], ContentType, Body}, [], []).

get_client_worker_pairs([],_WorkersMap,Ret)-> Ret;
get_client_worker_pairs([WorkerName|WorkersNames],WorkersMap,Ret)->
Expand Down
5 changes: 5 additions & 0 deletions src_erl/NerlnetApp/src/nerl_tools.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@
-define(VALIDATION_OF_TRANSMISSION_WITH_API_SERVER_INTERVAL_MS, 100). % how much between each resend
%% ETS definitions

%% HTTP Content type definitions
-define(HTTP_CONTENT_TYPE_MULTI_PART_FORM_DATA, "multipart/form-data").
-define(HTTP_CONTENT_TYPE_JSON, "application/json").
-define(HTTP_CONTENT_TYPE_FORM_URLENCODED, "application/x-www-form-urlencoded").

% 2 elements ETS:
-define(KEY_IDX, 1).
-define(DATA_IDX, 2).
Expand Down
Loading

0 comments on commit a2cfe5f

Please sign in to comment.