Discussion:
How to know when a multi-spawned process set finish its work?
Giovanni Giorgi
2021-04-12 16:08:54 UTC
Permalink
Hi all,

a newbie question here.

I have done a small erlang server following the behavior application,
here

https://github.com/daitangio/er_zauker/blob/erlang-24-migration/src/er_zauker_app.erl

To make a long story short, my server scans a set of directories and
index files using redis as backend database.

It works well when I runs on small set of files.

But when I run it on a very huge set of files, it seems to "finish"
before indexing all the files; when it starts, the client wait until
every file is processed and the server can send him a report about the
status:

er_zauker_indexer!{self(),directory,"."},
er_zauker_app:wait_worker_done().

The relevant part seems correct (see below) but I think I have done a
stupid mistake, but I cannot understand where is it.
Where can I find an example for this use case?

wait_worker_done()->
waitAllWorkerDone(1,erlang:monotonic_time(second)).
waitAllWorkerDone(RunningWorker,StartTimestamp) when RunningWorker >0 ->
er_zauker_indexer!{self(),report},
receive
{worker,0} ->
io:format("All workers done~n~n");
{worker, RunningGuys, files_processed, TotalFilesDone} ->
if
RunningGuys /= RunningWorker ->
SecondsRunning=
erlang:monotonic_time(second)-StartTimestamp,
FilesSec=TotalFilesDone/SecondsRunning,
io:format("[~p]s Workers[~p] Files processed:~p Files/sec:
~p ~n",[SecondsRunning,RunningGuys,TotalFilesDone,FilesSec]),
timer:sleep(200);
true ->
%% Okey so nothing changed so far...sleep a bit
timer:sleep(100)
end,
%% Master sleep value
timer:sleep(990),
waitAllWorkerDone(RunningGuys,StartTimestamp)
after 5000 ->
io:format("~n-----------------------------~n"),
io:format(" Mmmm no info in the last 5 sec... when was
running:~p Workers~n",[RunningWorker]),
io:format(" ?System is stuck? "),
io:format("------------------------------~n"),
waitAllWorkerDone(RunningWorker,StartTimestamp)
end;
waitAllWorkerDone(0,_) ->
io:format("All worker Finished").
--
Giovanni Giorgi via webmail
Giovanni Giorgi
2021-04-13 10:08:43 UTC
Permalink
Hi Giovanni,
Post by Giovanni Giorgi
https://github.com/daitangio/er_zauker/blob/erlang-24-migration/src/er_zauker_app.erl
first, sorry, I have no idea what this actually does and how it does
it ^^; I only skimmed the code. So what I'm saying here is a bit
guesswork...
From what I understand, you have a component that scans a tree on your
file system and delivers file names. And you have another component
that starts a process to index the files. Right?
...
If I read it correctly, you don't wait until _every file_ has been
_processed_. Instead, you wait until _a certain number of processes
you are monitoring_ (not necessarily the ones you wait for, you only
check if there _is_ a 'DOWN' message, not the monitor refs and/or
pids) has _terminated_ (~line 100), _for whatever reason_ (you don't
check the Reason for the termination). So some processes might have
crashed and not done any work at all.
...
and then a worker process may crash because it cannot open a file. But
you don't notice because, see above. May that be it?
Yeah,
thank you Maria, I think you are right.

So I suppose my entire code is not well written :(

Where can I find an example of a good code for this use case (even in
the standard OTP library of course, I do not fear to read OTP code :) ?
--
Giovanni Giorgi via webmail
Maria Scott
2021-04-13 14:49:21 UTC
Permalink
Hi Giovanni,
Post by Giovanni Giorgi
So I suppose my entire code is not well written :(
Let's say I would maybe do it a bit differently ;)))
Post by Giovanni Giorgi
Where can I find an example of a good code for this use case (even in
the standard OTP library of course, I do not fear to read OTP code :) ?
Hm, sorry, nothing comes readily to mind =^^=

Kind regards,
Maria
Krukoff, John
2021-04-13 19:13:03 UTC
Permalink
Where can I find an example of a good code for this use case (even in the standard OTP library of course, I do not fear to read OTP code :) ?
Hi Giovanni,

FWIW, when I had a similar need for a simple CLI script I wrote long ago, rpc:pmap/3 was a good solution for me:

https://erlang.org/doc/man/rpc.html#pmap-3

which I used for this small utility script:

https://github.com/jkrukoff/fprof-totals/blob/master/src/fprof_totals.erl#L147

Don't know if it qualifies as "good", but it is at least short.
--
Giovanni Giorgi via webmail
*************************************************************************
This e-mail may contain confidential or privileged information.
If you are not the intended recipient, please notify the sender immediately and then delete it.

TIAA
*************************************************************************
d***@schoen.or.at
2021-04-13 15:29:53 UTC
Permalink
Hi Giovanni,
I had a quick look into the code, and I think that the messages do not match..
You wait for {worker, 0} in the waitAllWorkerDone function, but I do not think that this message
is generated anywhere.
In that function you send a {self(), report} to the daemon, to which it will respond with

{worker,RunningWorker,files_processed,FilesProcessed}.
Although RunningWorker can eventually become 0 when all workers exit, so the record starts with
{worker, 0
but there are the other two items which prevent the matching with the expected outcome.
Could this be the issue?
Anyway, thanks for sharing your code, it is always interesting to see how somebody else is tackling a problem!
On a more highlevel view, are you really interesting in intermediate results?
If I wanted only the end result, I think I would use OTP with a gen_server and a synchronous call.
kind regards,
dieter
Am Mo., Apr. 12, 2021 18:36 schrieb Giovanni Giorgi : Hi all,
a newbie question here.
I have done a small erlang server following the behavior application, here
https://github.com/daitangio/er_zauker/blob/erlang-24-migration/src/er_zauker_app.erl (https://github.com/daitangio/er_zauker/blob/erlang-24-migration/src/er_zauker_app.erl)
To make a long story short, my server scans a set of directories and index files using redis as backend database.
It works well when I runs on small set of files.
But when I run it on a very huge set of files, it seems to "finish" before indexing all the files; when it starts, the client wait until every file is processed and the server can send him a report about the status:
er_zauker_indexer!{self(),directory,"."},
er_zauker_app:wait_worker_done().
The relevant part seems correct (see below) but I think I have done a stupid mistake, but I cannot understand where is it.
Where can I find an example for this use case?
wait_worker_done()->
waitAllWorkerDone(1,erlang:monotonic_time(second)).

waitAllWorkerDone(RunningWorker,StartTimestamp) when RunningWorker >0 ->
er_zauker_indexer!{self(),report},
receive
{worker,0} ->
io:format("All workers done~n~n");
{worker, RunningGuys, files_processed, TotalFilesDone} ->
if
RunningGuys /= RunningWorker ->
SecondsRunning= erlang:monotonic_time(second)-StartTimestamp,
FilesSec=TotalFilesDone/SecondsRunning,
io:format("[~p]s Workers[~p] Files processed:~p Files/sec: ~p ~n",[SecondsRunning,RunningGuys,TotalFilesDone,FilesSec]),
timer:sleep(200);
true ->
%% Okey so nothing changed so far...sleep a bit
timer:sleep(100)
end,
%% Master sleep value
timer:sleep(990),
waitAllWorkerDone(RunningGuys,StartTimestamp)
after 5000 ->
io:format("~n-----------------------------~n"),
io:format(" Mmmm no info in the last 5 sec... when was running:~p Workers~n",[RunningWorker]),
io:format(" ?System is stuck? "),
io:format("------------------------------~n"),
waitAllWorkerDone(RunningWorker,StartTimestamp)
end;
waitAllWorkerDone(0,_) ->
io:format("All worker Finished").
--
Giovanni Giorgi via webmail
Giovanni Giorgi
2021-04-13 19:14:07 UTC
Permalink
Hi all,

my idea was to able to monitor the execution, but I must explore the
gen_server+synchronous call in the future.

I was able to fix the bug following Maria suggestion (thank you Maria!).

The failing processes was dying due a redis timeout, probably because I
used a redis MULTI/EXEC transaction which can lead to race conditions on
the redis side.

I implemented a small database to track down failing processes and
respawing... The idea is only to track down the timeout errors and so I
changed the server to match "good" and "timeout" DOWN cases like

...

{'DOWN', Reference, process, _Pid, normal} ->
indexerDaemon(RunningWorker-1,FilesProcessed+1,
maps:remove(Reference,MonitorRefMap) );

{'DOWN', Reference, process, Pid, {timeout, Detail}} ->
%% MMMmm we must assume still files to be processed?
#{ Reference := FailedFile } = MonitorRefMap,
io:format("!! Timeout Error on ~p ~n Detail: ~p~n", [FailedFile,
{'DOWN', Reference, process, Pid, {timeout, Detail}}]),
% We suppose a timeout error and we push back
% Remove old Reference
UpdatedRefMap=maps:remove(Reference,MonitorRefMap),
NewPid=spawn(er_zauker_util, load_file_if_needed,[FailedFile]),
MonitorRef = erlang:monitor(process,NewPid),
NewRecoveryRefMap=UpdatedRefMap#{ MonitorRef => FailedFile },
indexerDaemon(RunningWorker,FilesProcessed,NewRecoveryRefMap);

I do not know if there is some other smart way of doing it.

Thank you for your hints!!

...
Hi Giovanni,
I had a quick look into the code, and I think that the messages do not match..
You wait for {worker, 0} in the waitAllWorkerDone function, but I do
not think that this message
is generated anywhere.
In that function you send a {self(), report} to the daemon, to which it will respond with
{worker,RunningWorker,files_processed,FilesProcessed}.
Although RunningWorker can eventually become 0 when all workers exit,
so the record starts with
{worker, 0
but there are the other two items which prevent the matching with the expected outcome.
Could this be the issue?
Anyway, thanks for sharing your code, it is always interesting to see
how somebody else is tackling a problem!
On a more highlevel view, are you really interesting in intermediate results?
If I wanted only the end result, I think I would use OTP with a
gen_server and a synchronous call.
kind regards,
dieter
Post by Giovanni Giorgi
Hi all,
a newbie question here.
I have done a small erlang server following the behavior application, here
https://github.com/daitangio/er_zauker/blob/erlang-24-migration/src/er_zauker_app.erl
To make a long story short, my server scans a set of directories and
index files using redis as backend database.
It works well when I runs on small set of files.
But when I run it on a very huge set of files, it seems to "finish"
before indexing all the files; when it starts, the client wait until
every file is processed and the server can send him a report about the
er_zauker_indexer!{self(),directory,"."},
er_zauker_app:wait_worker_done().
The relevant part seems correct (see below) but I think I have done a
stupid mistake, but I cannot understand where is it.
Where can I find an example for this use case?
wait_worker_done()->
waitAllWorkerDone(1,erlang:monotonic_time(second)).
waitAllWorkerDone(RunningWorker,StartTimestamp) when RunningWorker >0 ->
er_zauker_indexer!{self(),report},
receive
{worker,0} ->
io:format("All workers done~n~n");
{worker, RunningGuys, files_processed, TotalFilesDone} ->
if
RunningGuys /= RunningWorker ->
SecondsRunning= erlang:monotonic_time(second)-StartTimestamp,
FilesSec=TotalFilesDone/SecondsRunning,
io:format("[~p]s Workers[~p] Files processed:~p Files/sec: ~p
~n",[SecondsRunning,RunningGuys,TotalFilesDone,FilesSec]),
timer:sleep(200);
true ->
%% Okey so nothing changed so far...sleep a bit
timer:sleep(100)
end,
%% Master sleep value
timer:sleep(990),
waitAllWorkerDone(RunningGuys,StartTimestamp)
after 5000 ->
io:format("~n-----------------------------~n"),
io:format(" Mmmm no info in the last 5 sec... when was running:~p
Workers~n",[RunningWorker]),
io:format(" ?System is stuck? "),
io:format("------------------------------~n"),
waitAllWorkerDone(RunningWorker,StartTimestamp)
end;
waitAllWorkerDone(0,_) ->
io:format("All worker Finished").
--
Giovanni Giorgi via webmail
--
Giovanni Giorgi via webmail
Continue reading on narkive:
Loading...