summaryrefslogtreecommitdiffstats
path: root/tests/support/redis.tcl
diff options
context:
space:
mode:
Diffstat (limited to 'tests/support/redis.tcl')
-rw-r--r--tests/support/redis.tcl418
1 files changed, 418 insertions, 0 deletions
diff --git a/tests/support/redis.tcl b/tests/support/redis.tcl
new file mode 100644
index 0000000..861e8bc
--- /dev/null
+++ b/tests/support/redis.tcl
@@ -0,0 +1,418 @@
+# Tcl client library - used by the Redis test
+# Copyright (C) 2009-2014 Salvatore Sanfilippo
+# Released under the BSD license like Redis itself
+#
+# Example usage:
+#
+# set r [redis 127.0.0.1 6379]
+# $r lpush mylist foo
+# $r lpush mylist bar
+# $r lrange mylist 0 -1
+# $r close
+#
+# Non blocking usage example:
+#
+# proc handlePong {r type reply} {
+# puts "PONG $type '$reply'"
+# if {$reply ne "PONG"} {
+# $r ping [list handlePong]
+# }
+# }
+#
+# set r [redis]
+# $r blocking 0
+# $r get fo [list handlePong]
+#
+# vwait forever
+
+package require Tcl 8.5
+package provide redis 0.1
+
+namespace eval redis {}
+set ::redis::id 0
+array set ::redis::fd {}
+array set ::redis::addr {}
+array set ::redis::blocking {}
+array set ::redis::deferred {}
+array set ::redis::readraw {}
+array set ::redis::attributes {} ;# Holds the RESP3 attributes from the last call
+array set ::redis::reconnect {}
+array set ::redis::tls {}
+array set ::redis::callback {}
+array set ::redis::state {} ;# State in non-blocking reply reading
+array set ::redis::statestack {} ;# Stack of states, for nested mbulks
+
+proc redis {{server 127.0.0.1} {port 6379} {defer 0} {tls 0} {tlsoptions {}} {readraw 0}} {
+ if {$tls} {
+ package require tls
+ ::tls::init \
+ -cafile "$::tlsdir/ca.crt" \
+ -certfile "$::tlsdir/client.crt" \
+ -keyfile "$::tlsdir/client.key" \
+ {*}$tlsoptions
+ set fd [::tls::socket $server $port]
+ } else {
+ set fd [socket $server $port]
+ }
+ fconfigure $fd -translation binary
+ set id [incr ::redis::id]
+ set ::redis::fd($id) $fd
+ set ::redis::addr($id) [list $server $port]
+ set ::redis::blocking($id) 1
+ set ::redis::deferred($id) $defer
+ set ::redis::readraw($id) $readraw
+ set ::redis::reconnect($id) 0
+ set ::redis::tls($id) $tls
+ ::redis::redis_reset_state $id
+ interp alias {} ::redis::redisHandle$id {} ::redis::__dispatch__ $id
+}
+
+# On recent versions of tcl-tls/OpenSSL, reading from a dropped connection
+# results with an error we need to catch and mimic the old behavior.
+proc ::redis::redis_safe_read {fd len} {
+ if {$len == -1} {
+ set err [catch {set val [read $fd]} msg]
+ } else {
+ set err [catch {set val [read $fd $len]} msg]
+ }
+ if {!$err} {
+ return $val
+ }
+ if {[string match "*connection abort*" $msg]} {
+ return {}
+ }
+ error $msg
+}
+
+proc ::redis::redis_safe_gets {fd} {
+ if {[catch {set val [gets $fd]} msg]} {
+ if {[string match "*connection abort*" $msg]} {
+ return {}
+ }
+ error $msg
+ }
+ return $val
+}
+
+# This is a wrapper to the actual dispatching procedure that handles
+# reconnection if needed.
+proc ::redis::__dispatch__ {id method args} {
+ set errorcode [catch {::redis::__dispatch__raw__ $id $method $args} retval]
+ if {$errorcode && $::redis::reconnect($id) && $::redis::fd($id) eq {}} {
+ # Try again if the connection was lost.
+ # FIXME: we don't re-select the previously selected DB, nor we check
+ # if we are inside a transaction that needs to be re-issued from
+ # scratch.
+ set errorcode [catch {::redis::__dispatch__raw__ $id $method $args} retval]
+ }
+ return -code $errorcode $retval
+}
+
+proc ::redis::__dispatch__raw__ {id method argv} {
+ set fd $::redis::fd($id)
+
+ # Reconnect the link if needed.
+ if {$fd eq {} && $method ne {close}} {
+ lassign $::redis::addr($id) host port
+ if {$::redis::tls($id)} {
+ set ::redis::fd($id) [::tls::socket $host $port]
+ } else {
+ set ::redis::fd($id) [socket $host $port]
+ }
+ fconfigure $::redis::fd($id) -translation binary
+ set fd $::redis::fd($id)
+ }
+
+ set blocking $::redis::blocking($id)
+ set deferred $::redis::deferred($id)
+ if {$blocking == 0} {
+ if {[llength $argv] == 0} {
+ error "Please provide a callback in non-blocking mode"
+ }
+ set callback [lindex $argv end]
+ set argv [lrange $argv 0 end-1]
+ }
+ if {[info command ::redis::__method__$method] eq {}} {
+ catch {unset ::redis::attributes($id)}
+ set cmd "*[expr {[llength $argv]+1}]\r\n"
+ append cmd "$[string length $method]\r\n$method\r\n"
+ foreach a $argv {
+ append cmd "$[string length $a]\r\n$a\r\n"
+ }
+ ::redis::redis_write $fd $cmd
+ if {[catch {flush $fd}]} {
+ catch {close $fd}
+ set ::redis::fd($id) {}
+ return -code error "I/O error reading reply"
+ }
+
+ if {!$deferred} {
+ if {$blocking} {
+ ::redis::redis_read_reply $id $fd
+ } else {
+ # Every well formed reply read will pop an element from this
+ # list and use it as a callback. So pipelining is supported
+ # in non blocking mode.
+ lappend ::redis::callback($id) $callback
+ fileevent $fd readable [list ::redis::redis_readable $fd $id]
+ }
+ }
+ } else {
+ uplevel 1 [list ::redis::__method__$method $id $fd] $argv
+ }
+}
+
+proc ::redis::__method__blocking {id fd val} {
+ set ::redis::blocking($id) $val
+ fconfigure $fd -blocking $val
+}
+
+proc ::redis::__method__reconnect {id fd val} {
+ set ::redis::reconnect($id) $val
+}
+
+proc ::redis::__method__read {id fd} {
+ ::redis::redis_read_reply $id $fd
+}
+
+proc ::redis::__method__rawread {id fd {len -1}} {
+ return [redis_safe_read $fd $len]
+}
+
+proc ::redis::__method__write {id fd buf} {
+ ::redis::redis_write $fd $buf
+}
+
+proc ::redis::__method__flush {id fd} {
+ flush $fd
+}
+
+proc ::redis::__method__close {id fd} {
+ catch {close $fd}
+ catch {unset ::redis::fd($id)}
+ catch {unset ::redis::addr($id)}
+ catch {unset ::redis::blocking($id)}
+ catch {unset ::redis::deferred($id)}
+ catch {unset ::redis::readraw($id)}
+ catch {unset ::redis::attributes($id)}
+ catch {unset ::redis::reconnect($id)}
+ catch {unset ::redis::tls($id)}
+ catch {unset ::redis::state($id)}
+ catch {unset ::redis::statestack($id)}
+ catch {unset ::redis::callback($id)}
+ catch {interp alias {} ::redis::redisHandle$id {}}
+}
+
+proc ::redis::__method__channel {id fd} {
+ return $fd
+}
+
+proc ::redis::__method__deferred {id fd val} {
+ set ::redis::deferred($id) $val
+}
+
+proc ::redis::__method__readraw {id fd val} {
+ set ::redis::readraw($id) $val
+}
+
+proc ::redis::__method__readingraw {id fd} {
+ return $::redis::readraw($id)
+}
+
+proc ::redis::__method__attributes {id fd} {
+ set _ $::redis::attributes($id)
+}
+
+proc ::redis::redis_write {fd buf} {
+ puts -nonewline $fd $buf
+}
+
+proc ::redis::redis_writenl {fd buf} {
+ redis_write $fd $buf
+ redis_write $fd "\r\n"
+ flush $fd
+}
+
+proc ::redis::redis_readnl {fd len} {
+ set buf [redis_safe_read $fd $len]
+ redis_safe_read $fd 2 ; # discard CR LF
+ return $buf
+}
+
+proc ::redis::redis_bulk_read {fd} {
+ set count [redis_read_line $fd]
+ if {$count == -1} return {}
+ set buf [redis_readnl $fd $count]
+ return $buf
+}
+
+proc ::redis::redis_multi_bulk_read {id fd} {
+ set count [redis_read_line $fd]
+ if {$count == -1} return {}
+ set l {}
+ set err {}
+ for {set i 0} {$i < $count} {incr i} {
+ if {[catch {
+ lappend l [redis_read_reply $id $fd]
+ } e] && $err eq {}} {
+ set err $e
+ }
+ }
+ if {$err ne {}} {return -code error $err}
+ return $l
+}
+
+proc ::redis::redis_read_map {id fd} {
+ set count [redis_read_line $fd]
+ if {$count == -1} return {}
+ set d {}
+ set err {}
+ for {set i 0} {$i < $count} {incr i} {
+ if {[catch {
+ set k [redis_read_reply $id $fd] ; # key
+ set v [redis_read_reply $id $fd] ; # value
+ dict set d $k $v
+ } e] && $err eq {}} {
+ set err $e
+ }
+ }
+ if {$err ne {}} {return -code error $err}
+ return $d
+}
+
+proc ::redis::redis_read_line fd {
+ string trim [redis_safe_gets $fd]
+}
+
+proc ::redis::redis_read_null fd {
+ redis_safe_gets $fd
+ return {}
+}
+
+proc ::redis::redis_read_bool fd {
+ set v [redis_read_line $fd]
+ if {$v == "t"} {return 1}
+ if {$v == "f"} {return 0}
+ return -code error "Bad protocol, '$v' as bool type"
+}
+
+proc ::redis::redis_read_verbatim_str fd {
+ set v [redis_bulk_read $fd]
+ # strip the first 4 chars ("txt:")
+ return [string range $v 4 end]
+}
+
+proc ::redis::redis_read_reply {id fd} {
+ if {$::redis::readraw($id)} {
+ return [redis_read_line $fd]
+ }
+
+ while {1} {
+ set type [redis_safe_read $fd 1]
+ switch -exact -- $type {
+ _ {return [redis_read_null $fd]}
+ : -
+ ( -
+ + {return [redis_read_line $fd]}
+ , {return [expr {double([redis_read_line $fd])}]}
+ # {return [redis_read_bool $fd]}
+ = {return [redis_read_verbatim_str $fd]}
+ - {return -code error [redis_read_line $fd]}
+ $ {return [redis_bulk_read $fd]}
+ > -
+ ~ -
+ * {return [redis_multi_bulk_read $id $fd]}
+ % {return [redis_read_map $id $fd]}
+ | {
+ set attrib [redis_read_map $id $fd]
+ set ::redis::attributes($id) $attrib
+ continue
+ }
+ default {
+ if {$type eq {}} {
+ catch {close $fd}
+ set ::redis::fd($id) {}
+ return -code error "I/O error reading reply"
+ }
+ return -code error "Bad protocol, '$type' as reply type byte"
+ }
+ }
+ }
+}
+
+proc ::redis::redis_reset_state id {
+ set ::redis::state($id) [dict create buf {} mbulk -1 bulk -1 reply {}]
+ set ::redis::statestack($id) {}
+}
+
+proc ::redis::redis_call_callback {id type reply} {
+ set cb [lindex $::redis::callback($id) 0]
+ set ::redis::callback($id) [lrange $::redis::callback($id) 1 end]
+ uplevel #0 $cb [list ::redis::redisHandle$id $type $reply]
+ ::redis::redis_reset_state $id
+}
+
+# Read a reply in non-blocking mode.
+proc ::redis::redis_readable {fd id} {
+ if {[eof $fd]} {
+ redis_call_callback $id eof {}
+ ::redis::__method__close $id $fd
+ return
+ }
+ if {[dict get $::redis::state($id) bulk] == -1} {
+ set line [gets $fd]
+ if {$line eq {}} return ;# No complete line available, return
+ switch -exact -- [string index $line 0] {
+ : -
+ + {redis_call_callback $id reply [string range $line 1 end-1]}
+ - {redis_call_callback $id err [string range $line 1 end-1]}
+ ( {redis_call_callback $id reply [string range $line 1 end-1]}
+ $ {
+ dict set ::redis::state($id) bulk \
+ [expr [string range $line 1 end-1]+2]
+ if {[dict get $::redis::state($id) bulk] == 1} {
+ # We got a $-1, hack the state to play well with this.
+ dict set ::redis::state($id) bulk 2
+ dict set ::redis::state($id) buf "\r\n"
+ ::redis::redis_readable $fd $id
+ }
+ }
+ * {
+ dict set ::redis::state($id) mbulk [string range $line 1 end-1]
+ # Handle *-1
+ if {[dict get $::redis::state($id) mbulk] == -1} {
+ redis_call_callback $id reply {}
+ }
+ }
+ default {
+ redis_call_callback $id err \
+ "Bad protocol, $type as reply type byte"
+ }
+ }
+ } else {
+ set totlen [dict get $::redis::state($id) bulk]
+ set buflen [string length [dict get $::redis::state($id) buf]]
+ set toread [expr {$totlen-$buflen}]
+ set data [read $fd $toread]
+ set nread [string length $data]
+ dict append ::redis::state($id) buf $data
+ # Check if we read a complete bulk reply
+ if {[string length [dict get $::redis::state($id) buf]] ==
+ [dict get $::redis::state($id) bulk]} {
+ if {[dict get $::redis::state($id) mbulk] == -1} {
+ redis_call_callback $id reply \
+ [string range [dict get $::redis::state($id) buf] 0 end-2]
+ } else {
+ dict with ::redis::state($id) {
+ lappend reply [string range $buf 0 end-2]
+ incr mbulk -1
+ set bulk -1
+ }
+ if {[dict get $::redis::state($id) mbulk] == 0} {
+ redis_call_callback $id reply \
+ [dict get $::redis::state($id) reply]
+ }
+ }
+ }
+ }
+}