package Mojo::IOLoop::ReadWriteFork;
=head1 NAME
Mojo::IOLoop::ReadWriteFork - Fork a process and read/write from it
=head1 VERSION
0.04
=head1 DESCRIPTION
This class enable you to fork children which you can write data to
and emit events when the child prints to STDERR or STDOUT.
Patches that enable the L</read> event to see the difference between STDERR
and STDOUT are more than welcome.
=head1 SYNOPSIS
=head2 Standalone
my $fork = Mojo::IOLoop::ReadWriteFork->new;
my $cat_result = '';
$fork->on(error => sub {
my($fork, $error) = @_;
warn $error;
});
$fork->on(close => sub {
my($fork, $exit_value, $signal) = @_;
warn "got close event";
Mojo::IOLoop->stop;
});
$fork->on(read => sub {
my($fork, $buffer) = @_; # $buffer = both STDERR and STDOUT
$cat_result .= $buffer;
});
$fork->start(
program => 'bash',
program_args => [ -c => 'echo $YIKES foo bar baz' ],
conduit => 'pty',
);
=head2 In a Mojolicios::Controller
See L<https://github.com/jhthorsen/mojo-ioloop-readwritefork/tree/master/example/tail.pl>.
=cut
use Mojo::Base 'Mojo::EventEmitter';
use IO::Pty;
use Mojo::Util 'url_escape';
use POSIX ':sys_wait_h';
use Scalar::Util ();
use constant CHUNK_SIZE => $ENV{MOJO_CHUNK_SIZE} || 131072;
use constant DEBUG => $ENV{MOJO_READWRITE_FORK_DEBUG} || 0;
use constant WAIT_PID_INTERVAL => $ENV{WAIT_PID_INTERVAL} || 0.01;
our $VERSION = '0.04';
=head1 EVENTS
=head2 close
Emitted when the child process exit.
=head2 error
Emitted when when the there is an issue with creating, writing or reading
from the child process.
=head2 read
Emitted when the child has written a chunk of data to STDOUT or STDERR.
=head1 ATTRIBUTES
=head2 pid
Holds the child process ID.
=cut
has pid => 0;
=head2 reactor
Holds a L<Mojo::Reactor> object. Default is:
Mojo::IOLoop->singleton->reactor;
=cut
has reactor => sub {
require Mojo::IOLoop;
Mojo::IOLoop->singleton->reactor;
};
=head1 METHODS
=head2 start
$self->start(
program => sub { my @program_args = @_; ... },
program_args => [ @data ],
);
$self->start(
program => $str,
program_args => [@str],
conduit => $str, # pipe or pty
raw => $bool,
clone_winsize_from => \*STDIN,
);
Used to fork and exec a child process.
L<raw|IO::Pty> and C<clone_winsize_from|IO::Pty> only makes sense if
C<conduit> is "pty".
=cut
sub start {
my $self = shift;
my $args = ref $_[0] ? $_[0] : {@_};
$args->{env} = { %ENV };
$args->{program} or die 'program is required input';
$args->{conduit} ||= 'pipe';
$args->{program_args} ||= [];
ref $args->{program_args} eq 'ARRAY' or die 'program_args need to be an array';
Scalar::Util::weaken($self);
$self->{delay} = $self->reactor->timer(0 => sub { $self->_start($args) });
$self;
}
sub _start {
local($?, $!);
my($self, $args) = @_;
my($stdout_read, $stdout_write);
my($stdin_read, $stdin_write);
my $pid;
if($args->{conduit} eq 'pipe') {
pipe $stdout_read, $stdout_write or return $self->emit_safe(error => "pipe: $!");
pipe $stdin_read, $stdin_write or return $self->emit_safe(error => "pipe: $!");
select +(select($stdout_write), $| = 1)[0];
select +(select($stdin_write), $| = 1)[0];
}
elsif($args->{conduit} eq 'pty') {
$stdin_write = $stdout_read = IO::Pty->new
}
else {
warn "Invalid conduit ($args->{conduit})\n" if DEBUG;
return $self->emit_safe(error => "Invalid conduit ($args->{conduit})");
}
$pid = fork;
if(!defined $pid) {
warn "Could not fork $!\n" if DEBUG;
$self->emit_safe(error => "Couldn't fork ($!)");
}
elsif($pid) { # parent ===================================================
warn "[$pid] Child starting ($args->{program} @{$args->{program_args}})\n" if DEBUG;
$self->{pid} = $pid;
$self->{stdin_write} = $stdin_write;
$self->{stdout_read} = $stdout_read;
$stdout_read->close_slave if defined $stdout_read and UNIVERSAL::isa($stdout_read, 'IO::Pty');
Scalar::Util::weaken($self);
$self->reactor->io($stdout_read => sub {
$self->{stop} and return;
local($?, $!);
my $reactor = shift;
$self->_read;
# 5 = Input/output error
if($! == 5) {
warn "[$pid] Ignoring child after $!\n" if DEBUG;
$self->kill;
$self->{stop}++;
}
elsif($!) {
warn "[$pid] Child $!\n" if DEBUG;
$self->emit_safe(error => "Read error: $!");
}
});
$self->reactor->watch($stdout_read, 1, 0);
$self->_setup_recurring_child_alive_check($pid);
}
else { # child ===========================================================
if($args->{conduit} eq 'pty') {
$stdin_write->make_slave_controlling_terminal;
$stdin_read = $stdout_write = $stdin_write->slave;
$stdin_read->set_raw if $args->{raw};
$stdin_read->clone_winsize_from($args->{clone_winsize_from}) if $args->{clone_winsize_from};
}
warn "[$$] Starting $args->{program} @{ $args->{program_args} }\n" if DEBUG;
close $stdin_write;
close $stdout_read;
open STDIN, '<&' . fileno $stdin_read or die $!;
open STDOUT, '>&' . fileno $stdout_write or die $!;
open STDERR, '>&' . fileno $stdout_write or die $!;
select STDERR; $| = 1;
select STDOUT; $| = 1;
$ENV{$_} = $args->{env}{$_} for keys %{ $args->{env} };
if(ref $args->{program} eq 'CODE') {
$args->{program}->(@{ $args->{program_args} });
exit 0;
}
else {
exec $args->{program}, @{ $args->{program_args} };
}
}
}
sub _setup_recurring_child_alive_check {
my($self, $pid) = @_;
my $reactor = $self->reactor;
$reactor->{forks}{$pid} = $self;
Scalar::Util::weaken($reactor->{forks}{$pid});
$reactor->{fork_watcher} ||= $reactor->recurring(WAIT_PID_INTERVAL, sub {
my $reactor = shift;
for my $pid (keys %{ $reactor->{forks} }) {
local($?, $!);
local $SIG{CHLD} = 'DEFAULT'; # no idea why i need to do this, but it seems like waitpid() below return -1 if not
my $obj = $reactor->{forks}{$pid} || {};
if(waitpid($pid, WNOHANG) <= 0) {
# NOTE: cannot use kill() to check if the process is alive, since
# the process might be owned by another user.
-d "/proc/$pid" and next;
}
my($exit_value, $signal) = ($? >> 8, $? & 127);
warn "[$pid] Child is dead $exit_value/$signal\n" if DEBUG;
delete $reactor->{forks}{$pid} or next; # SUPER DUPER IMPORTANT THAT THIS next; IS NOT BEFORE waitpid; ABOVE!
$obj->_read; # flush the rest
$obj->emit_safe(close => $exit_value, $signal);
$obj->_cleanup;
}
});
}
=head2 write
$self->write($buffer);
Used to write data to the child process.
=cut
sub write {
my($self, $buffer) = @_;
$self->{stdin_write} or return;
print { $self->{stdin_write} } $buffer;
$self->{stdin_write}->flush or die "Write buffer (" .url_escape($buffer) .") failed: $!";
warn "[${ \$self->pid }] Wrote buffer (" .url_escape($buffer) .")\n" if DEBUG;
}
=head2 kill
$bool = $self->kill;
$bool = $self->kill(15); # default
Used to signal the child.
=cut
sub kill {
my $self = shift;
my $signal = shift // 15;
my $pid = $self->{pid} or return;
warn "[$pid] Kill $signal\n" if DEBUG;
kill $signal, $pid;
}
sub _cleanup {
my $self = shift;
my $reactor = $self->{reactor} or return;
$reactor->watch($self->{stdout_read}, 0, 0) if $self->{stdout_read};
$reactor->remove(delete $self->{stdout_read}) if $self->{stdout_read};
$reactor->remove(delete $self->{delay}) if $self->{delay};
$reactor->remove(delete $self->{stdin_write}) if $self->{stdin_write};
}
sub _read {
my $self = shift;
my $stdout_read = $self->{stdout_read} or return;
my $read = $stdout_read->sysread(my $buffer, CHUNK_SIZE, 0);
return unless defined $read;
return unless $read;
warn "[$self->{pid}] Got buffer (" .url_escape($buffer) .")\n" if DEBUG;
$self->emit_safe(read => $buffer);
}
sub DESTROY { shift->_cleanup }
=head1 AUTHOR
Jan Henning Thorsen - C<jhthorsen@cpan.org>
=cut
1;