package MogileFS::Worker::Replicate; # replicates files around use strict; use base 'MogileFS::Worker'; use fields ( 'fidtodo', # hashref { fid => 1 } 'peerrepl', # hashref { fid => time() } # when peer started replicating 'rebal_pol_obj', # rebalancer policy object ); use List::Util (); use MogileFS::Util qw(error every debug); use MogileFS::Class; use MogileFS::RebalancePolicy::DrainDevices; use MogileFS::ReplicationRequest qw(rr_upgrade); # setup the value used in a 'nexttry' field to indicate that this item will never # actually be tried again and require some sort of manual intervention. use constant ENDOFTIME => 2147483647; # { fid => lastcheck }; instructs us not to replicate this fid... we will clear # out fids from this list that are expired my %fidfailure; # { fid => 1 }; used to keep track of fids we find in the unreachable_fids table my %unreachable; sub end_of_time { ENDOFTIME; } sub new { my ($class, $psock) = @_; my $self = fields::new($class); $self->SUPER::new($psock); $self->{fidtodo} = {}; $self->{peerrepl} = {}; return $self; } sub process_line { my ($self, $lineref) = @_; if ($$lineref =~ /^repl_was_done (\d+)/) { delete $self->{fidtodo}{$1}; return 1; } if ($$lineref =~ /^repl_starting (\d+)/) { my $fidid = $1; $self->note_peer_replicating($fidid); return 1; } if ($$lineref =~ /^repl_unreachable (\d+)/) { $unreachable{$1} = 1; return 1; } # telnet to main port and do: # !to replicate repl_compat {0,1} # to change it in realtime, without restarting. if ($$lineref =~ /^repl_compat (\d+)/) { MogileFS::Config->set_config("old_repl_compat", $1); return 1; } return 0; } # replicator wants sub watchdog_timeout { 30; } sub work { my $self = shift; # give the monitor job 15 seconds to give us an update my $warn_after = time() + 15; every(2.0, sub { $self->parent_ping; # replication doesn't go well if the monitor job hasn't actively started # marking things as being available unless ($self->monitor_has_run) { error("waiting for monitor job to complete a cycle before beginning replication") if time() > $warn_after; return; } $self->validate_dbh; my $dbh = $self->get_dbh or return 0; # update our unreachable fid list... we consider them good for 15 minutes # FIXME: uh, what is this even used for nowadays? it made more sense in mogilefs 1.x, # so maybe we kinda need it for compatibility? but maybe we could ditch it here and # instead use the file_to_replicate table's row sat ENDOFTIME as meaning the same? my $urfids = $dbh->selectall_arrayref('SELECT fid, lastupdate FROM unreachable_fids'); die $dbh->errstr if $dbh->err; foreach my $r (@{$urfids || []}) { my $nv = $r->[1] + 900; unless ($fidfailure{$r->[0]} && $fidfailure{$r->[0]} < $nv) { # given that we might have set it below to a time past the unreachable # 15 minute timeout, we want to only overwrite %fidfailure's idea of # the expiration time if we are extending it $fidfailure{$r->[0]} = $nv; } $unreachable{$r->[0]} = 1; } my $idle = 1; # this finds stuff to replicate based on its record in the needs_replication table $idle = 0 if $self->replicate_using_torepl_table; # this finds stuff to replicate based on the devcounts. (old style) if (MogileFS::Config->config("old_repl_compat")) { $idle = 0 if $self->replicate_using_devcounts; } # if replicators are otherwise idle, use them to make the world # better, rebalancing things (if enabled), and draining devices (if # any are marked drain) if ($idle) { $self->rebalance_devices; $self->drain_devices; } }); } use constant REPLFETCH_LIMIT => 1000; # return 1 if we did something (or tried to do something), return 0 if # there was nothing to be done. sub replicate_using_torepl_table { my $self = shift; # find some fids to replicate, prioritize based on when they should be tried my $sto = Mgd::get_store(); my @to_repl = $sto->files_to_replicate(REPLFETCH_LIMIT) or return 0; # get random list of hashref of things to do: @to_repl = List::Util::shuffle(@to_repl); # sort our priority list in terms of 0s (immediate, only 1 copy), 1s (immediate replicate, # but we already have 2 copies), and big numbers (unixtimestamps) of things that failed. # but because sort is stable, these are random within their 0/1/big classes. @to_repl = sort { ($a->{nexttry} < 1000 || $b->{nexttry} < 1000) ? ($a->{nexttry} <=> $b->{nexttry}) : 0 } @to_repl; foreach my $todo (@to_repl) { my $fid = $todo->{fid}; next if $self->peer_is_replicating($fid); my $errcode; my %opts; $opts{errref} = \$errcode; $opts{no_unlock} = 1; # to make it return an $unlock subref $opts{source_devid} = $todo->{fromdevid} if $todo->{fromdevid}; my ($status, $unlock) = replicate($fid, %opts); if ($status) { # $status is either 0 (failure, handled below), 1 (success, we actually # replicated this file), or 2 (success, but someone else replicated it). # when $staus eq "lost_race", this delete is unnecessary normally # (somebody else presumably already deleted it if they # also replicated it), but in the case of running with old # replicators from previous versions, -or- simply if the # other guy's delete failed, this cleans it up.... $sto->delete_fid_from_file_to_replicate($fid); $unlock->() if $unlock; next; } debug("Replication of fid=$fid failed with errcode=$errcode") if $Mgd::DEBUG >= 2; # ERROR CASES: # README: please keep this up to date if you update the replicate() function so we ensure # that this code always does the right thing # # -- HARMLESS -- # failed_getting_lock => harmless. skip. somebody else probably doing. # # -- TEMPORARY; DO EXPONENTIAL BACKOFF -- # source_down => only source available is observed down. # policy_error_doing_failed => policy plugin fucked up. it's looping. # policy_error_already_there => policy plugin fucked up. it's dumb. # policy_no_suggestions => no copy was attempted. policy is just not happy. # copy_error => policy said to do 1+ things, we failed, it ran out of suggestions. # # -- FATAL; DON'T TRY AGAIN -- # no_source => it simply exists nowhere. not that something's down, but file_on is empty. # bail if we failed getting the lock, that means someone else probably # already did it, so we should just move on if ($errcode eq 'failed_getting_lock') { $unlock->() if $unlock; next; } # logic for setting the next try time appropriately my $update_nexttry = sub { my ($type, $delay) = @_; my $sto = Mgd::get_store(); if ($type eq 'end_of_time') { # special; update to a time that won't happen again, # as we've encountered a scenario in which case we're # really hosed $sto->reschedule_file_to_replicate_absolute($fid, ENDOFTIME); } elsif ($type eq "offset") { $sto->reschedule_file_to_replicate_relative($fid, $delay+0); } else { $sto->reschedule_file_to_replicate_absolute($fid, $delay+0); } }; # now let's handle any error we want to consider a total failure; do not # retry at any point. push this file off to the end so someone has to come # along and figure out what went wrong. if ($errcode eq 'no_source') { $update_nexttry->( end_of_time => 1 ); $unlock->() if $unlock; next; } # at this point, the rest of the errors require exponential backoff. define what this means # as far as failcount -> delay to next try. # 15s, 1m, 5m, 30m, 1h, 2h, 4h, 8h, 24h, 24h, 24h, 24h, ... my @backoff = qw( 15 60 300 1800 3600 7200 14400 28800 ); $update_nexttry->( offset => int(($backoff[$todo->{failcount}] || 86400) * (rand(0.4) + 0.8)) ); $unlock->() if $unlock; } return 1; } sub replicate_using_devcounts { my $self = shift; # this code path only exists for mogilefsd 1.x compatibility and # is not needed in a pure-MogileFS 2.x environment. and since you # can't use non-MySQL in 1.x, it's pointless to port this code to # the MogileFS::Store system to make it db portable. so we just # skip here if not using MySQL. my $sto = Mgd::get_store(); return 0 unless $sto->isa("MogileFS::Store::MySQL"); # call this $mdbh to indiciate it's a MySQL dbh, and to help grepping # for old handles. :) my $mdbh = $sto->dbh; my $did_something = 0; MogileFS::Class->foreach(sub { my $mclass = shift; my ($dmid, $classid, $min, $policy_class) = map { $mclass->$_ } qw(domainid classid mindevcount policy_class); debug("Checking replication for dmid=$dmid, classid=$classid, min=$min") if $Mgd::DEBUG >= 2; my $LIMIT = 1000; # try going from devcount of 1 up to devcount of $min-1 $self->{fidtodo} = {}; my $fixed = 0; my $attempted = 0; my $devcount = 1; while ($fixed < $LIMIT && $devcount < $min) { my $now = time(); $self->still_alive; my $fids = $mdbh->selectcol_arrayref("SELECT fid FROM file WHERE dmid=? AND classid=? ". "AND devcount = ? AND length IS NOT NULL ". "LIMIT $LIMIT", undef, $dmid, $classid, $devcount); die $mdbh->errstr if $mdbh->err; $self->{fidtodo}{$_} = 1 foreach @$fids; # increase devcount so we try to replicate the files at the next devcount $devcount++; # see if we have any files to replicate my $count = $fids ? scalar @$fids : 0; debug(" found $count for dmid=$dmid/classid=$classid/min=$min") if $Mgd::DEBUG >= 2; next unless $count; # randomize the list so multiple daemons/threads working on # replicate at the same time don't all fight over the # same fids to move my @randfids = List::Util::shuffle(@$fids); debug("Need to replicate: $dmid/$classid: @$fids") if $Mgd::DEBUG >= 2; foreach my $fid (@randfids) { # now replicate this fid $attempted++; $did_something = 1; next unless $self->{fidtodo}{$fid}; next if $self->peer_is_replicating($fid); if ($fidfailure{$fid}) { if ($fidfailure{$fid} < $now) { delete $fidfailure{$fid}; } else { next; } } $self->read_from_parent; $self->still_alive; if (my $status = replicate($fid)) { # $status is either 0 (failure, handled below), 1 (success, we actually # replicated this file), or 2 (success, but someone else replicated it). # so if it's 2, we just want to go to the next fid. this file is done. next if $status eq "lost_race"; # if it was no longer reachable, mark it reachable if (delete $unreachable{$fid}) { $mdbh->do("DELETE FROM unreachable_fids WHERE fid = ?", undef, $fid); die $mdbh->errstr if $mdbh->err; } # housekeeping $fixed++; $self->send_to_parent("repl_i_did $fid"); # status update if ($Mgd::DEBUG >= 1 && $fixed % 20 == 0) { my $ratio = $fixed/$attempted*100; error(sprintf("replicated=$fixed, attempted=$attempted, ratio=%.2f%%", $ratio)) if $fixed % 20 == 0; } } else { # failed in replicate, don't retry for a minute $fidfailure{$fid} = $now + 60; } } } }); return $did_something; } sub rebalance_devices { my $self = shift; my $sto = Mgd::get_store(); return 0 unless $sto->server_setting('enable_rebalance'); my $pol = $self->rebalance_policy_obj or return 0; unless ($self->run_rebalance_policy_a_bit($pol)) { error("disabling rebalancing due to lack of work"); MogileFS::Config->set_server_setting("enable_rebalance", 0); } } sub drain_devices { my $self = shift; my $pol = MogileFS::RebalancePolicy::DrainDevices->instance; my $rv = $self->run_rebalance_policy_a_bit($pol); #error("[$$] drained = $rv\n") if $rv; } # returns number of files rebalanced. sub run_rebalance_policy_a_bit { my ($self, $pol) = @_; my $stop_at = time() + 5; # Run for up to 5 seconds, then return. my $n = 0; my %avoid_devids = map { $_->id => 1 } $pol->dest_devs_to_avoid; while (my $dfid = $pol->devfid_to_rebalance) { $self->rebalance_devfid($dfid, avoid_devids => \%avoid_devids); $n++; last if time() >= $stop_at; } return $n; } sub rebalance_policy_obj { my $self = shift; my $rclass = Mgd::get_store()->server_setting('rebalance_policy') || "MogileFS::RebalancePolicy::PercentFree"; # return old one, if it's still of the same type. if ($self->{'rebal_pol_obj'} && ref($self->{'rebal_pol_obj'}) eq $rclass) { return $self->{'rebal_pol_obj'}; } return error("Bogus rebalance_policy setting") unless $rclass =~ /^[\w:\-]+$/; return error("Failed to load $rclass: $@") unless eval "use $rclass; 1;"; my $pol = eval { $rclass->new }; return error("Failed to instantiate rebalance policy: $@") unless $pol; return $self->{'rebal_pol_obj'} = $pol; } # Return 1 on success, 0 on failure. sub rebalance_devfid { my ($self, $devfid, %opts) = @_; MogileFS::Util::okay_args(\%opts, qw(avoid_devids)); my $fid = $devfid->fid; # bail out early if this FID is no longer in the namespace (weird # case where file is in file_on because not yet deleted, but # has been replaced/deleted in 'file' table...). not too harmful # (just nosiy) if thise line didn't exist, but whatever... it # makes stuff cleaner on my intentionally-corrupted-for-fsck-testing # dev machine... return 1 if ! $fid->exists; my ($ret, $unlock) = replicate($fid, mask_devids => { $devfid->devid => 1 }, no_unlock => 1, avoid_devids => $opts{avoid_devids}, ); my $fail = sub { my $error = shift; $unlock->(); error("Rebalance for $devfid (" . $devfid->url . ") failed: $error"); return 0; }; unless ($ret) { return $fail->("Replication failed"); } my $should_delete = 0; my $del_reason; if ($ret eq "lost_race") { # for some reason, we did no work. that could be because # either 1) we lost the race, as the error code implies, # and some other process rebalanced this first, or 2) # the file is over-replicated, and everybody just thinks they # lost the race because the replication policy said there's # nothing to do, even with this devfid masked away. # so let's figure it out... if this devfid still exists, # we're overreplicated, else we just lost the race. if ($devfid->exists) { # over-replicated # see if some copy, besides this one we want # to delete, is currently alive & of right size.. # just as extra paranoid check before we delete it foreach my $test_df ($fid->devfids) { next if $test_df->devid == $devfid->devid; if ($test_df->size_matches) { $should_delete = 1; $del_reason = "over_replicated"; last; } } } else { # lost race $should_delete = 0; # no-op } } else { $should_delete = 1; $del_reason = "did_rebalance;ret=$ret"; } if ($should_delete) { eval { $devfid->destroy }; if ($@) { return $fail->("HTTP delete (due to '$del_reason') failed: $@"); } } $unlock->(); return 1; } # replicates $fid to make sure it meets its class' replicate policy. # # README: if you update this sub to return a new error code, please update the # appropriate callers to know how to deal with the errors returned. # # returns either: # $rv # ($rv, $unlock_sub) -- when 'no_unlock' %opt is used. subref to release lock. # $rv is one of: # 0 = failure (failure written to ${$opts{errref}}) # 1 = success # "lost_race" = skipping, we did no work and policy was already met. # "nofid" => fid no longer exists. skip replication. sub replicate { my ($fid, %opts) = @_; $fid = MogileFS::FID->new($fid) unless ref $fid; my $fidid = $fid->id; debug("Replication for $fidid called, opts=".join(',',keys(%opts))) if $Mgd::DEBUG >= 2; my $errref = delete $opts{'errref'}; my $no_unlock = delete $opts{'no_unlock'}; my $sdevid = delete $opts{'source_devid'}; my $mask_devids = delete $opts{'mask_devids'} || {}; my $avoid_devids = delete $opts{'avoid_devids'} || {}; die "unknown_opts" if %opts; die unless ref $mask_devids eq "HASH"; # bool: if source was explicitly requested by caller my $fixed_source = $sdevid ? 1 : 0; my $sto = Mgd::get_store(); my $unlock = sub { $sto->note_done_replicating($fidid); }; my $retunlock = sub { my $rv = shift; my ($errmsg, $errcode); if (@_ == 2) { ($errcode, $errmsg) = @_; $errmsg = "$errcode: $errmsg"; # include code with message } else { ($errmsg) = @_; } $$errref = $errcode if $errref; my $ret; if ($errcode && $errcode eq "failed_getting_lock") { # don't emit a warning with error() on lock failure. not # a big deal, don't scare people. $ret = 0; } else { $ret = $rv ? $rv : error($errmsg); } if ($no_unlock) { die "ERROR: must be called in list context w/ no_unlock" unless wantarray; return ($ret, $unlock); } else { die "ERROR: must not be called in list context w/o no_unlock" if wantarray; $unlock->(); return $ret; } }; # hashref of devid -> MogileFS::Device my $devs = MogileFS::Device->map or die "No device map"; return $retunlock->(0, "failed_getting_lock", "Unable to obtain lock for fid $fidid") unless $sto->should_begin_replicating_fidid($fidid); MogileFS::Worker->send_to_parent("repl_starting $fidid"); # if the fid doesn't even exist, consider our job done! no point # replicating file contents of a file no longer in the namespace. return $retunlock->("nofid") unless $fid->exists; my $cls = $fid->class; my $policy_class = $cls->policy_class; eval "use $policy_class; 1;"; if ($@) { return error("Failed to load policy class: $policy_class: $@"); } # learn what this devices file is already on my @on_devs; # all devices fid is on, reachable or not. my @on_devs_tellpol; # subset of @on_devs, to tell the policy class about my @on_up_devid; # subset of @on_devs: just devs that are readable foreach my $devid ($fid->devids) { my $d = MogileFS::Device->of_devid($devid) or next; push @on_devs, $d; if ($d->dstate->should_have_files && ! $mask_devids->{$devid}) { push @on_devs_tellpol, $d; } if ($d->dstate->can_read_from) { push @on_up_devid, $devid; } } return $retunlock->(0, "no_source", "Source is no longer available replicating $fidid") if @on_devs == 0; return $retunlock->(0, "source_down", "No alive devices available replicating $fidid") if @on_up_devid == 0; # if they requested a specific source, that source must be up. if ($sdevid && ! grep { $_ == $sdevid} @on_up_devid) { return $retunlock->(0, "source_down", "Requested replication source device $sdevid not available"); } my %dest_failed; # devid -> 1 for each devid we were asked to copy to, but failed. my %source_failed; # devid -> 1 for each devid we had problems reading from. my $got_copy_request = 0; # true once replication policy asks us to move something somewhere my $copy_err; my $rr; # MogileFS::ReplicationRequest while (1) { $rr = rr_upgrade($policy_class->replicate_to( fid => $fidid, on_devs => \@on_devs_tellpol, # all device objects fid is on, dead or otherwise all_devs => $devs, failed => \%dest_failed, min => $cls->mindevcount, )); last if $rr->is_happy; my @ddevs; # dest devs, in order of preferrence my $ddevid; # dest devid we've chosen to copy to if (@ddevs = $rr->copy_to_one_of_ideally) { if (my @not_masked_ids = (grep { ! $mask_devids->{$_} && ! $avoid_devids->{$_} } map { $_->id } @ddevs)) { $ddevid = $not_masked_ids[0]; } else { # once we masked devids away, there were no # ideal suggestions. this is the case of rebalancing, # which without this check could 'worsen' the state # of the world. consider the case: # h1[ d1 d2 ] h2[ d3 ] # and files are on d1 & d3, an ideal layout. # if d3 is being rebalanced, and masked away, the # replication policy could presumably say to put # the file on d2, even though d3 isn't dead. # so instead, when masking is in effect, we don't # use non-ideal placement, just bailing out. # saying we lost a race is a bit of a lie.. but eh. return $retunlock->("lost_race"); } } elsif (@ddevs = $rr->copy_to_one_of_desperate) { # TODO: reschedule a replication for 'n' minutes in future, or # when new hosts/devices become available or change state $ddevid = $ddevs[0]->id; } else { last; } $got_copy_request = 1; # replication policy shouldn't tell us to put a file on a device # we've already told it that we've failed at. so if we get that response, # the policy plugin is broken and we should terminate now. if ($dest_failed{$ddevid}) { return $retunlock->(0, "policy_error_doing_failed", "replication policy told us to do something we already told it we failed at while replicating fid $fidid"); } # replication policy shouldn't tell us to put a file on a # device that it's already on. that's just stupid. if (grep { $_->id == $ddevid } @on_devs) { return $retunlock->(0, "policy_error_already_there", "replication policy told us to put fid $fidid on dev $ddevid, but it's already there!"); } # find where we're replicating from unless ($fixed_source) { # TODO: use an observed good device+host as source to start. my @choices = grep { ! $source_failed{$_} } @on_up_devid; return $retunlock->(0, "source_down", "No devices available replicating $fidid") unless @choices; $sdevid = @choices[int(rand(scalar @choices))]; } my $worker = MogileFS::ProcManager->is_child or die; my $rv = http_copy( sdevid => $sdevid, ddevid => $ddevid, fid => $fidid, expected_len => undef, # FIXME: get this info to pass along errref => \$copy_err, callback => sub { $worker->still_alive; }, ); die "Bogus error code: $copy_err" if !$rv && $copy_err !~ /^(?:src|dest)_error$/; unless ($rv) { error("Failed copying fid $fidid from devid $sdevid to devid $ddevid (error type: $copy_err)"); if ($copy_err eq "src_error") { $source_failed{$sdevid} = 1; if ($fixed_source) { # there can't be any more retries, as this source # is busted and is the only one we wanted. return $retunlock->(0, "copy_error", "error copying fid $fidid from devid $sdevid during replication"); } } else { $dest_failed{$ddevid} = 1; } next; } my $dfid = MogileFS::DevFID->new($ddevid, $fid); $dfid->add_to_db; push @on_devs, $devs->{$ddevid}; push @on_devs_tellpol, $devs->{$ddevid}; } if ($rr->is_happy) { return $retunlock->(1) if $got_copy_request; return $retunlock->("lost_race"); # some other process got to it first. policy was happy immediately. } return $retunlock->(0, "policy_no_suggestions", "replication policy ran out of suggestions for us replicating fid $fidid"); } my $last_peerreplclean = 0; sub note_peer_replicating { my ($self, $fidid) = @_; my $now = time(); $self->{peerrepl}{$fidid} = $now; # every minute, clean fids in this set older than 2 minutes if ($now > $last_peerreplclean + 60) { $last_peerreplclean = $now; while (my ($k, $t) = each %{$self->{peerrepl}}) { next if $t > $now - 120; delete $self->{peerrepl}{$k}; } } } # best effort optimization, doesn't have to be perfect (for instance, # doesn't currently know what peers on other hosts are doing, only # peer process). just try to avoid lock contention trying to ask for # locks on replicating same files. we say that if a file was started # by a peer in last 60 seconds, it's still being replicated. sub peer_is_replicating { my ($self, $fidid) = @_; my $t = $self->{peerrepl}{$fidid} or return 0; my $rv = ($t > time() - 60); return $rv; } # copies a file from one Perlbal to another utilizing HTTP sub http_copy { my %opts = @_; my ($sdevid, $ddevid, $fid, $expected_clen, $intercopy_cb, $errref) = map { delete $opts{$_} } qw(sdevid ddevid fid expected_len callback errref ); die if %opts; $intercopy_cb ||= sub {}; # handles setting unreachable magic; $error->(reachability, "message") my $error_unreachable = sub { my $worker = MogileFS::ProcManager->is_child; $worker->send_to_parent(":repl_unreachable $fid"); MogileFS::FID->new($fid)->mark_unreachable; $$errref = "src_error" if $errref; return error("Fid $fid unreachable while replicating: $_[0]"); }; my $dest_error = sub { $$errref = "dest_error" if $errref; error($_[0]); return 0; }; my $src_error = sub { $$errref = "src_error" if $errref; error($_[0]); return 0; }; # get some information we'll need my $sdev = MogileFS::Device->of_devid($sdevid); my $ddev = MogileFS::Device->of_devid($ddevid); return error("Error: unable to get device information: source=$sdevid, destination=$ddevid, fid=$fid") unless $sdev && $ddev && $sdev->exists && $ddev->exists; my $s_dfid = MogileFS::DevFID->new($sdev, $fid); my $d_dfid = MogileFS::DevFID->new($ddev, $fid); my ($spath, $dpath) = (map { $_->uri_path } ($s_dfid, $d_dfid)); my ($shost, $dhost) = (map { $_->host } ($sdev, $ddev)); my ($shostip, $sport) = ($shost->ip, $shost->http_port); my ($dhostip, $dport) = ($dhost->ip, $dhost->http_port); unless (defined $spath && defined $dpath && defined $shostip && defined $dhostip && $sport && $dport) { # show detailed information to find out what's not configured right error("Error: unable to replicate file fid=$fid from device id $sdevid to device id $ddevid"); error(" http://$shostip:$sport$spath -> http://$dhostip:$dport$dpath"); return 0; } # need by webdav servers, like lighttpd... $ddev->vivify_directories($d_dfid->url); # setup our pipe error handler, in case we get closed on my $pipe_closed = 0; local $SIG{PIPE} = sub { $pipe_closed = 1; }; # okay, now get the file my $sock = IO::Socket::INET->new(PeerAddr => $shostip, PeerPort => $sport, Timeout => 2) or return $src_error->("Unable to create source socket to $shostip:$sport for $spath"); $sock->write("GET $spath HTTP/1.0\r\n\r\n"); return error("Pipe closed retrieving $spath from $shostip:$sport") if $pipe_closed; # we just want a content length my $clen; # FIXME: this can block. needs to timeout. while (defined (my $line = <$sock>)) { $line =~ s/[\s\r\n]+$//; last unless length $line; if ($line =~ m!^HTTP/\d+\.\d+\s+(\d+)!) { # make sure we get a good response return $error_unreachable->("Error: Resource http://$shostip:$sport$spath failed: HTTP $1") unless $1 >= 200 && $1 <= 299; } next unless $line =~ /^Content-length:\s*(\d+)\s*$/i; $clen = $1; } return $error_unreachable->("File $spath has a content-length of 0; unable to replicate") unless $clen; return $error_unreachable->("File $spath has unexpected content-length of $clen, not $expected_clen") if defined $expected_clen && $clen != $expected_clen; # open target for put my $dsock = IO::Socket::INET->new(PeerAddr => $dhostip, PeerPort => $dport, Timeout => 2) or return $dest_error->("Unable to create dest socket to $dhostip:$dport for $dpath"); $dsock->write("PUT $dpath HTTP/1.0\r\nContent-length: $clen\r\n\r\n") or return $dest_error->("Unable to write data to $dpath on $dhostip:$dport"); return $dest_error->("Pipe closed during write to $dpath on $dhostip:$dport") if $pipe_closed; # now read data and print while we're reading. my ($data, $written, $remain) = ('', 0, $clen); my $bytes_to_read = 1024*1024; # read 1MB at a time until there's less than that remaining $bytes_to_read = $remain if $remain < $bytes_to_read; my $finished_read = 0; while (!$pipe_closed && (my $bytes = $sock->read($data, $bytes_to_read))) { # now we've read in $bytes bytes $remain -= $bytes; $bytes_to_read = $remain if $remain < $bytes_to_read; my $wbytes = $dsock->send($data); $written += $wbytes; return $dest_error->("Error: wrote $wbytes; expected to write $bytes; failed putting to $dpath") unless $wbytes == $bytes; $intercopy_cb->(); die if $bytes_to_read < 0; next if $bytes_to_read; $finished_read = 1; last; } return $dest_error->("closed pipe writing to destination") if $pipe_closed; return $src_error->("error reading midway through source: $!") unless $finished_read; # now read in the response line (should be first line) my $line = <$dsock>; if ($line =~ m!^HTTP/\d+\.\d+\s+(\d+)!) { return 1 if $1 >= 200 && $1 <= 299; return $dest_error->("Got HTTP status code $1 PUTing to http://$dhostip:$dport$dpath"); } else { return $dest_error->("Error: HTTP response line not recognized writing to http://$dhostip:$dport$dpath: $line"); } } 1; # Local Variables: # mode: perl # c-basic-indent: 4 # indent-tabs-mode: nil # End: __END__ =head1 NAME MogileFS::Worker::Replicate -- replicates files =head1 OVERVIEW This process replicates files enqueued in B table. The replication policy (which devices to replicate to) is pluggable, but only one policy comes with the server. See L =head1 SEE ALSO L L L