use strict;
use warnings;
use Test::More tests => 19;

use AnyEvent;

BEGIN { use_ok('Data::Stream::Bulk::AnyEvent'); }

my @ret = (([1,2], [3,4], [], [5,6], [1,2]) x 3, undef);
my @expected = @ret;
my $stream =  Data::Stream::Bulk::AnyEvent->new(
	callback => sub {
		my $cv = AE::cv;
		my $ret = shift @ret;
		$cv->send($ret);
		return $cv;
	},
);

# Blocking mode

ok(! $stream->is_done, '!is_done in initial');
is_deeply($stream->next, shift @expected);
is_deeply($stream->next, shift @expected);
is_deeply($stream->next, shift @expected);

# Blocking to callback

my $cv = AE::cv;
my $count = 3;
$stream->cb(sub {
	my $got = shift->recv;
	is_deeply($got, shift @expected);
	$cv->send unless --$count;
	return $count;
});

# Callback to blocking by next

$cv->recv;
is_deeply($stream->next, shift @expected);
is_deeply($stream->next, shift @expected);
is_deeply($stream->next, shift @expected);

# Blocking to callback

$cv = AE::cv;
$count = 3;
$stream->cb(sub {
	my $got = shift->recv;
	is_deeply($got, shift @expected);
	$cv->send unless --$count;
	return $count;
});

# Callback to blocking by setting callback as undef

$cv->recv;
$stream->cb(undef);
foreach my $expected (@expected) {
	is_deeply($stream->next, $expected);
}

ok($stream->is_done, 'is_done');