summaryrefslogtreecommitdiffstats
path: root/plugin/handler_socket/perl-Net-HandlerSocket/lib/Net/HandlerSocket/Pool.pm
diff options
context:
space:
mode:
Diffstat (limited to 'plugin/handler_socket/perl-Net-HandlerSocket/lib/Net/HandlerSocket/Pool.pm')
-rwxr-xr-xplugin/handler_socket/perl-Net-HandlerSocket/lib/Net/HandlerSocket/Pool.pm362
1 files changed, 362 insertions, 0 deletions
diff --git a/plugin/handler_socket/perl-Net-HandlerSocket/lib/Net/HandlerSocket/Pool.pm b/plugin/handler_socket/perl-Net-HandlerSocket/lib/Net/HandlerSocket/Pool.pm
new file mode 100755
index 00000000..b6ea6265
--- /dev/null
+++ b/plugin/handler_socket/perl-Net-HandlerSocket/lib/Net/HandlerSocket/Pool.pm
@@ -0,0 +1,362 @@
+#!/usr/bin/env perl
+
+package Net::HandlerSocket::HSPool;
+
+use strict;
+use warnings;
+use Net::HandlerSocket;
+use Socket;
+
+sub new {
+ my $self = {
+ config => $_[1],
+ reopen_interval => 60,
+ hostmap => { },
+ };
+ return bless $self, $_[0];
+}
+
+sub clear_pool {
+ my ($self) = @_;
+ $self->{hostmap} = { };
+}
+
+sub on_error {
+ my ($self, $obj) = @_;
+ my $error_func = $self->{config}->{error};
+ if (defined($error_func)) {
+ return &{$error_func}($obj);
+ }
+ die $obj;
+}
+
+sub on_warning {
+ my ($self, $obj) = @_;
+ my $warning_func = $self->{config}->{warning};
+ if (defined($warning_func)) {
+ return &{$warning_func}($obj);
+ }
+}
+
+sub get_conf {
+ my ($self, $dbtbl) = @_;
+ my $hcent = $self->{config}->{hostmap}->{$dbtbl};
+ if (!defined($hcent)) {
+ $self->on_error("get_conf: $dbtbl not found");
+ return undef;
+ }
+ my %cpy = %$hcent;
+ $cpy{port} ||= 9998;
+ $cpy{timeout} ||= 2;
+ return \%cpy;
+}
+
+sub resolve_hostname {
+ my ($self, $hcent, $host_ip_list) = @_;
+ if (defined($host_ip_list)) {
+ if (scalar(@$host_ip_list) > 0) {
+ $hcent->{host} = shift(@$host_ip_list);
+ return $host_ip_list;
+ }
+ return undef; # no more ip
+ }
+ my $host = $hcent->{host}; # unresolved name
+ $hcent->{hostname} = $host;
+ my $resolve_list_func = $self->{config}->{resolve_list};
+ if (defined($resolve_list_func)) {
+ $host_ip_list = &{$resolve_list_func}($host);
+ if (scalar(@$host_ip_list) > 0) {
+ $hcent->{host} = shift(@$host_ip_list);
+ return $host_ip_list;
+ }
+ return undef; # no more ip
+ }
+ my $resolve_func = $self->{config}->{resolve};
+ if (defined($resolve_func)) {
+ $hcent->{host} = &{$resolve_func}($host);
+ return [];
+ }
+ my $packed = gethostbyname($host);
+ if (!defined($packed)) {
+ return undef;
+ }
+ $hcent->{host} = inet_ntoa($packed);
+ return [];
+}
+
+sub get_handle_exec {
+ my ($self, $db, $tbl, $idx, $cols, $exec_multi, $exec_args) = @_;
+ my $now = time();
+ my $dbtbl = join('.', $db, $tbl);
+ my $hcent = $self->get_conf($dbtbl); # copy
+ if (!defined($hcent)) {
+ return undef;
+ }
+ my $hmkey = join(':', $hcent->{host}, $hcent->{port});
+ my $hment = $self->{hostmap}->{$hmkey};
+ # [ open_time, handle, index_map, host, next_index_id ]
+ my $host_ip_list;
+ TRY_OTHER_IP:
+ if (!defined($hment) ||
+ $hment->[0] + $self->{reopen_interval} < $now ||
+ !$hment->[1]->stable_point()) {
+ $host_ip_list = $self->resolve_hostname($hcent, $host_ip_list);
+ if (!defined($host_ip_list)) {
+ my $hostport = $hmkey . '(' . $hcent->{host} . ')';
+ $self->on_error("HSPool::get_handle" .
+ "($db, $tbl, $idx, $cols): host=$hmkey: " .
+ "no more active ip");
+ return undef;
+ }
+ my $hnd = new Net::HandlerSocket($hcent);
+ my %m = ();
+ $hment = [ $now, $hnd, \%m, $hcent->{host}, 1 ];
+ $self->{hostmap}->{$hmkey} = $hment;
+ }
+ my $hnd = $hment->[1];
+ my $idxmap = $hment->[2];
+ my $imkey = join(':', $idx, $cols);
+ my $idx_id = $idxmap->{$imkey};
+ if (!defined($idx_id)) {
+ $idx_id = $hment->[4];
+ my $e = $hnd->open_index($idx_id, $db, $tbl, $idx, $cols);
+ if ($e != 0) {
+ my $estr = $hnd->get_error();
+ my $hostport = $hmkey . '(' . $hcent->{host} . ')';
+ my $errmess = "HSPool::get_handle open_index" .
+ "($db, $tbl, $idx, $cols): host=$hostport " .
+ "err=$e($estr)";
+ $self->on_warning($errmess);
+ $hnd->close();
+ $hment = undef;
+ goto TRY_OTHER_IP;
+ }
+ $hment->[4]++;
+ $idxmap->{$imkey} = $idx_id;
+ }
+ if ($exec_multi) {
+ my $resarr;
+ for my $cmdent (@$exec_args) {
+ $cmdent->[0] = $idx_id;
+ }
+ if (scalar(@$exec_args) == 0) {
+ $resarr = [];
+ } else {
+ $resarr = $hnd->execute_multi($exec_args);
+ }
+ my $i = 0;
+ for my $res (@$resarr) {
+ if ($res->[0] != 0) {
+ my $cmdent = $exec_args->[$i];
+ my $ec = $res->[0];
+ my $estr = $res->[1];
+ my $op = $cmdent->[1];
+ my $kfvs = $cmdent->[2];
+ my $kvstr = defined($kfvs)
+ ? join(',', @$kfvs) : '';
+ my $limit = $cmdent->[3] || 0;
+ my $skip = $cmdent->[4] || 0;
+ my $hostport = $hmkey . '(' . $hcent->{host}
+ . ')';
+ my $errmess = "HSPool::get_handle execm" .
+ "($db, $tbl, $idx, [$cols], " .
+ "($idx_id), $op, [$kvstr] " .
+ "$limit, $skip): " .
+ "host=$hostport err=$ec($estr)";
+ if ($res->[0] < 0 || $res->[0] == 2) {
+ $self->on_warning($errmess);
+ $hnd->close();
+ $hment = undef;
+ goto TRY_OTHER_IP;
+ } else {
+ $self->on_error($errmess);
+ }
+ }
+ shift(@$res);
+ ++$i;
+ }
+ return $resarr;
+ } else {
+ my $res = $hnd->execute_find($idx_id, @$exec_args);
+ if ($res->[0] != 0) {
+ my ($op, $kfvals, $limit, $skip) = @$exec_args;
+ my $ec = $res->[0];
+ my $estr = $res->[1];
+ my $kvstr = join(',', @$kfvals);
+ my $hostport = $hmkey . '(' . $hcent->{host} . ')';
+ my $errmess = "HSPool::get_handle exec" .
+ "($db, $tbl, $idx, [$cols], ($idx_id), " .
+ "$op, [$kvstr], $limit, $skip): " .
+ "host=$hostport err=$ec($estr)";
+ if ($res->[0] < 0 || $res->[0] == 2) {
+ $self->on_warning($errmess);
+ $hnd->close();
+ $hment = undef;
+ goto TRY_OTHER_IP;
+ } else {
+ $self->on_error($errmess);
+ }
+ }
+ shift(@$res);
+ return $res;
+ }
+}
+
+sub index_find {
+ my ($self, $db, $tbl, $idx, $cols, $op, $kfvals, $limit, $skip) = @_;
+ # cols: comma separated list
+ # kfvals: arrayref
+ $limit ||= 0;
+ $skip ||= 0;
+ my $res = $self->get_handle_exec($db, $tbl, $idx, $cols,
+ 0, [ $op, $kfvals, $limit, $skip ]);
+ return $res;
+}
+
+sub index_find_multi {
+ my ($self, $db, $tbl, $idx, $cols, $cmdlist) = @_;
+ # cols : comma separated list
+ # cmdlist : [ dummy, op, kfvals, limit, skip ]
+ # kfvals : arrayref
+ my $resarr = $self->get_handle_exec($db, $tbl, $idx, $cols,
+ 1, $cmdlist);
+ return $resarr;
+}
+
+sub result_single_to_arrarr {
+ my ($numcols, $hsres, $ret) = @_;
+ my $hsreslen = scalar(@$hsres);
+ my $rlen = int($hsreslen / $numcols);
+ $ret = [ ] if !defined($ret);
+ my @r = ();
+ my $p = 0;
+ for (my $i = 0; $i < $rlen; ++$i) {
+ my @a = splice(@$hsres, $p, $numcols);
+ $p += $numcols;
+ push(@$ret, \@a);
+ }
+ return $ret; # arrayref of arrayrefs
+}
+
+sub result_multi_to_arrarr {
+ my ($numcols, $mhsres, $ret) = @_;
+ $ret = [ ] if !defined($ret);
+ for my $hsres (@$mhsres) {
+ my $hsreslen = scalar(@$hsres);
+ my $rlen = int($hsreslen / $numcols);
+ my $p = 0;
+ for (my $i = 0; $i < $rlen; ++$i) {
+ my @a = splice(@$hsres, $p, $numcols);
+ $p += $numcols;
+ push(@$ret, \@a);
+ }
+ }
+ return $ret; # arrayref of arrayrefs
+}
+
+sub result_single_to_hasharr {
+ my ($names, $hsres, $ret) = @_;
+ my $nameslen = scalar(@$names);
+ my $hsreslen = scalar(@$hsres);
+ my $rlen = int($hsreslen / $nameslen);
+ $ret = [ ] if !defined($ret);
+ my $p = 0;
+ for (my $i = 0; $i < $rlen; ++$i) {
+ my %h = ();
+ for (my $j = 0; $j < $nameslen; ++$j, ++$p) {
+ $h{$names->[$j]} = $hsres->[$p];
+ }
+ push(@$ret, \%h);
+ }
+ return $ret; # arrayref of hashrefs
+}
+
+sub result_multi_to_hasharr {
+ my ($names, $mhsres, $ret) = @_;
+ my $nameslen = scalar(@$names);
+ $ret = [ ] if !defined($ret);
+ for my $hsres (@$mhsres) {
+ my $hsreslen = scalar(@$hsres);
+ my $rlen = int($hsreslen / $nameslen);
+ my $p = 0;
+ for (my $i = 0; $i < $rlen; ++$i) {
+ my %h = ();
+ for (my $j = 0; $j < $nameslen; ++$j, ++$p) {
+ $h{$names->[$j]} = $hsres->[$p];
+ }
+ push(@$ret, \%h);
+ }
+ }
+ return $ret; # arrayref of hashrefs
+}
+
+sub result_single_to_hashhash {
+ my ($names, $key, $hsres, $ret) = @_;
+ my $nameslen = scalar(@$names);
+ my $hsreslen = scalar(@$hsres);
+ my $rlen = int($hsreslen / $nameslen);
+ $ret = { } if !defined($ret);
+ my $p = 0;
+ for (my $i = 0; $i < $rlen; ++$i) {
+ my %h = ();
+ for (my $j = 0; $j < $nameslen; ++$j, ++$p) {
+ $h{$names->[$j]} = $hsres->[$p];
+ }
+ my $k = $h{$key};
+ $ret->{$k} = \%h if defined($k);
+ }
+ return $ret; # hashref of hashrefs
+}
+
+sub result_multi_to_hashhash {
+ my ($names, $key, $mhsres, $ret) = @_;
+ my $nameslen = scalar(@$names);
+ $ret = { } if !defined($ret);
+ for my $hsres (@$mhsres) {
+ my $hsreslen = scalar(@$hsres);
+ my $rlen = int($hsreslen / $nameslen);
+ my $p = 0;
+ for (my $i = 0; $i < $rlen; ++$i) {
+ my %h = ();
+ for (my $j = 0; $j < $nameslen; ++$j, ++$p) {
+ $h{$names->[$j]} = $hsres->[$p];
+ }
+ my $k = $h{$key};
+ $ret->{$k} = \%h if defined($k);
+ }
+ }
+ return $ret; # hashref of hashrefs
+}
+
+sub select_cols_where_eq_aa {
+ # SELECT $cols FROM $db.$tbl WHERE $idx_key = $kv LIMIT 1
+ my ($self, $db, $tbl, $idx, $cols_aref, $kv_aref) = @_;
+ my $cols_str = join(',', @$cols_aref);
+ my $res = $self->index_find($db, $tbl, $idx, $cols_str, '=', $kv_aref);
+ return result_single_to_arrarr(scalar(@$cols_aref), $res);
+}
+
+sub select_cols_where_eq_hh {
+ # SELECT $cols FROM $db.$tbl WHERE $idx_key = $kv LIMIT 1
+ my ($self, $db, $tbl, $idx, $cols_aref, $kv_aref, $retkey) = @_;
+ my $cols_str = join(',', @$cols_aref);
+ my $res = $self->index_find($db, $tbl, $idx, $cols_str, '=', $kv_aref);
+ my $r = result_single_to_hashhash($cols_aref, $retkey, $res);
+ return $r;
+}
+
+sub select_cols_where_in_hh {
+ # SELECT $cols FROM $db.$tbl WHERE $idx_key in ($vals)
+ my ($self, $db, $tbl, $idx, $cols_aref, $vals_aref, $retkey) = @_;
+ my $cols_str = join(',', @$cols_aref);
+ my @cmdlist = ();
+ for my $v (@$vals_aref) {
+ push(@cmdlist, [ -1, '=', [ $v ] ]);
+ }
+ my $res = $self->index_find_multi($db, $tbl, $idx, $cols_str,
+ \@cmdlist);
+ return result_multi_to_hashhash($cols_aref, $retkey, $res);
+}
+
+1;
+