Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions crates/go/src/package/wit_types/wit_future.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ type FutureReader[T any] struct {
handle *wit_runtime.Handle
}

// Blocks until the future completes and returns its value.
//
// # Panic
//
// Read will panic if multiple concurrent or sequential reads are attempted on the same future.
func (self *FutureReader[T]) Read() T {
handle := self.handle.Take()
defer self.vtable.DropReadable(handle)
Expand Down Expand Up @@ -53,6 +58,7 @@ func (self *FutureReader[T]) Read() T {
}
}

// Notify the host that the FutureReader is no longer being used.
func (self *FutureReader[T]) Drop() {
handle := self.handle.TakeOrNil()
if handle != 0 {
Expand Down Expand Up @@ -85,6 +91,11 @@ type FutureWriter[T any] struct {
handle *wit_runtime.Handle
}

// Writes data to a future.
//
// # Panic
//
// Write will panic if multiple concurrent or sequential writes are attempted on the same future.
func (self *FutureWriter[T]) Write(item T) bool {
handle := self.handle.Take()
defer self.vtable.DropWritable(handle)
Expand Down Expand Up @@ -119,6 +130,7 @@ func (self *FutureWriter[T]) Write(item T) bool {
}
}

// Notify the host that the FutureWriter is no longer being used.
func (self *FutureWriter[T]) Drop() {
handle := self.handle.TakeOrNil()
if handle != 0 {
Expand Down
31 changes: 29 additions & 2 deletions crates/go/src/package/wit_types/wit_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,22 @@ func (self *StreamReader[T]) WriterDropped() bool {
return self.writerDropped
}

// Reads data from a stream into a destination slice.
//
// Blocks until the read completes or the destination slice is full.
//
// # Panic
//
// Read will panic if:
// - dst is empty (length 0)
// - multiple concurrent reads are attempted on the same stream
func (self *StreamReader[T]) Read(dst []T) uint32 {
handle := self.handle.Use()
if len(dst) == 0 {
panic("StreamReader.Read: destination slice cannot be empty")
}

handle := self.handle.Take()
defer self.handle.Set(handle)

if self.writerDropped {
return 0
Expand Down Expand Up @@ -68,6 +82,7 @@ func (self *StreamReader[T]) Read(dst []T) uint32 {
return count
}

// Notify the host that the StreamReader is no longer being used.
func (self *StreamReader[T]) Drop() {
handle := self.handle.TakeOrNil()
if handle != 0 {
Expand Down Expand Up @@ -105,8 +120,14 @@ func (self *StreamWriter[T]) ReaderDropped() bool {
return self.readerDropped
}

// Writes items to a stream, returning the count written (may be partial).
//
// # Panic
//
// Write will panic if multiple concurrent writes are attempted on the same stream.
func (self *StreamWriter[T]) Write(items []T) uint32 {
handle := self.handle.Use()
handle := self.handle.Take()
defer self.handle.Set(handle)

if self.readerDropped {
return 0
Expand Down Expand Up @@ -152,6 +173,11 @@ func (self *StreamWriter[T]) Write(items []T) uint32 {
return count
}

// Writes all items to the stream, looping until complete or reader drops.
//
// # Panic
//
// WriteAll will panic if multiple concurrent writes are attempted on the same stream.
func (self *StreamWriter[T]) WriteAll(items []T) uint32 {
offset := uint32(0)
count := uint32(len(items))
Expand All @@ -161,6 +187,7 @@ func (self *StreamWriter[T]) WriteAll(items []T) uint32 {
return offset
}

// Notify the host that the StreamReader is no longer being used.
func (self *StreamWriter[T]) Drop() {
handle := self.handle.TakeOrNil()
if handle != 0 {
Expand Down
70 changes: 70 additions & 0 deletions tests/runtime-async/async/simple-future/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,80 @@ func Run() {
(<-read)
assert(!(<-write))
}

{
tx, rx := test.MakeFutureUnit()
syncBarrier := make(chan struct{})
panicCh := make(chan any, 2)

for range 2 {
go func() {
// Because the channel is empty, it will block until it's closed, at which
// point all Goroutines will attempt to simultaneously read from the future.
<-syncBarrier
panicCh <- checkPanicValue(func() {
rx.Read()
})
}()
}
close(syncBarrier)

go func() {
// If this is omitted, the host will see that the "rx.Read" operations aren't paired with
// a "tx.Write" and will result in a "wasm trap: deadlock detected" error. Additionally,
// this is placed after "close(syncBarrier)" to ensure that the panics are resulting from
// concurrent reads, and not from other scenarios that result in a nil handle.
tx.Write(wit_types.Unit{})
}()

p1, p2 := <-panicCh, <-panicCh

// One should succeed (nil), one should panic
assert((p1 == nil && p2 == "nil handle") || (p1 == "nil handle" && p2 == nil))
}

{
tx, rx := test.MakeFutureUnit()
syncBarrier := make(chan struct{})
panicCh := make(chan any, 2)

for range 2 {
go func() {
// Because the channel is empty, it will block until it's closed, at which
// point all Goroutines will attempt to simultaneously write to the future.
<-syncBarrier
panicCh <- checkPanicValue(func() {
tx.Write(wit_types.Unit{})
})
}()
}
close(syncBarrier)

go func() {
// If this is omitted, the host will see that the "tx.Write" operations aren't paired with
// an "rx.Read" and will result in a "wasm trap: deadlock detected" error. Additionally,
// this is placed after "close(syncBarrier)" to ensure that the panics are resulting from
// concurrent writes, and not from other scenarios that result in a nil handle.
rx.Read()
}()

p1, p2 := <-panicCh, <-panicCh

// One should succeed (nil), one should panic
assert((p1 == nil && p2 == "nil handle") || (p1 == "nil handle" && p2 == nil))
}
}

func assert(v bool) {
if !v {
panic("assertion failed")
}
}

func checkPanicValue(f func()) (value any) {
defer func() {
value = recover()
}()
f()
return nil
}
104 changes: 89 additions & 15 deletions tests/runtime-async/async/simple-stream/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,92 @@ func Run() {
write := make(chan wit_types.Unit)
read := make(chan wit_types.Unit)

tx, rx := test.MakeStreamUnit()
go func() {
assertEqual(tx.Write([]wit_types.Unit{wit_types.Unit{}}), 1)
assert(!tx.ReaderDropped())
{
tx, rx := test.MakeStreamUnit()
go func() {
assertEqual(tx.Write([]wit_types.Unit{wit_types.Unit{}}), 1)
assert(!tx.ReaderDropped())

assertEqual(tx.Write([]wit_types.Unit{wit_types.Unit{}, wit_types.Unit{}}), 2)
assertEqual(tx.Write([]wit_types.Unit{wit_types.Unit{}, wit_types.Unit{}}), 2)

assertEqual(tx.Write([]wit_types.Unit{wit_types.Unit{}, wit_types.Unit{}}), 0)
assert(tx.ReaderDropped())
assertEqual(tx.Write([]wit_types.Unit{wit_types.Unit{}, wit_types.Unit{}}), 0)
assert(tx.ReaderDropped())

write <- wit_types.Unit{}
}()
write <- wit_types.Unit{}
}()

go func() {
test.ReadStream(rx)
read <- wit_types.Unit{}
}()
go func() {
test.ReadStream(rx)
read <- wit_types.Unit{}
}()

(<-read)
(<-write)
}

{
tx, rx := test.MakeStreamUnit()
syncBarrier := make(chan struct{})
panicCh := make(chan any, 2)

for range 2 {
go func() {
// Because the channel is empty, it will block until it's closed, at which
// point all Goroutines will attempt to simultaneously read from the stream.
<-syncBarrier
panicCh <- checkPanicValue(func() {
result := make([]wit_types.Unit, 1)
rx.Read(result)
})
}()
}
close(syncBarrier)

go func() {
// If this is omitted, the host will see that the "rx.Read" operations aren't paired with
// a "tx.WriteAll" and will result in a "wasm trap: deadlock detected" error. Additionally,
// this is placed after "close(syncBarrier)" to ensure that the panics are resulting from
// concurrent reads, and not from other scenarios that result in a nil handle.
tx.WriteAll([]wit_types.Unit{wit_types.Unit{}})
}()

p1, p2 := <-panicCh, <-panicCh

// One should succeed (nil), one should panic
assert((p1 == nil && p2 == "nil handle") || (p1 == "nil handle" && p2 == nil))
}

{
tx, rx := test.MakeStreamUnit()
syncBarrier := make(chan struct{})
panicCh := make(chan any, 2)

for range 2 {
go func() {
// Because the channel is empty, it will block until it's closed, at which
// point all Goroutines will attempt to simultaneously write to the stream.
<-syncBarrier
panicCh <- checkPanicValue(func() {
tx.WriteAll([]wit_types.Unit{wit_types.Unit{}})
})
}()
}
close(syncBarrier)

go func() {
// If this is omitted, the host will see that the "tx.WriteAll" operations aren't paired with
// an "rx.Read" and will result in a "wasm trap: deadlock detected" error. Additionally,
// this is placed after "close(syncBarrier)" to ensure that the panics are resulting from
// concurrent writes, and not from other scenarios that result in a nil handle.
result := make([]wit_types.Unit, 1)
rx.Read(result)
}()

p1, p2 := <-panicCh, <-panicCh

(<-read)
(<-write)
// One should succeed (nil), one should panic
assert((p1 == nil && p2 == "nil handle") || (p1 == "nil handle" && p2 == nil))
}
}

func assertEqual[T comparable](a, b T) {
Expand All @@ -44,3 +110,11 @@ func assert(v bool) {
panic("assertion failed")
}
}

func checkPanicValue(f func()) (value any) {
defer func() {
value = recover()
}()
f()
return nil
}
Loading