Skip to content

Make the stream types more capable and use them in more places#58

Open
mhagger wants to merge 9 commits into
znull/v2-early-close-regression-testsfrom
moar-streams
Open

Make the stream types more capable and use them in more places#58
mhagger wants to merge 9 commits into
znull/v2-early-close-regression-testsfrom
moar-streams

Conversation

@mhagger

@mhagger mhagger commented Jun 13, 2026

Copy link
Copy Markdown
Member

Now that we have the InputStream and OutputStream types, we might as well use them in more places and pin some more functionality on them:

  • First, fix Close() to pass any errors through.
  • Remove the Closer() methods to force the streams' Close() methods to be used instead. This allows us to put more functionality in Close(), like…
  • Make the Close() method idempotent. To make this possible, we first need to:
    • Turn InputStream and OutputStream into pointer types.
  • Use InputStream and OutputStream in the definition of Pipeline.

The commits are written to be readable one by one, and CI passes after each commit. This PR is against the branch from PR #57.

The next planned step

Changing the streams to pointer types is interesting when starting up stages: once you have successfully passed a stream to Stage.Start(), you can clear your own pointer and then you don't have to worry about accidentally calling Close() on a stream that is already owned by a stage. That, together with the fact that it is allowed to call Close() on nil pointers, will make the bookkeeping trivial. I've got some commits that change the helper class used by Pipeline.Start() and will try to push that soon. On the other hand, it's a pointer so it's visually a little bit more noisy.

The idempotency of Close() might be a little bit controversial. Maybe there should be an error if the client calls Close() twice, if we want the client to be forced to fix whatever's wrong? On the other hand, it would let Pipeline offer the service that it calls Close() on all streams when the corresponding stage finishes, in case the client forgot to do so, to add a little bit of belt 'n' suspenders. I'm open to being convinced that this is a bad idea.

/cc @znull

mhagger added 9 commits June 13, 2026 17:12
This is to force all close calls to come through `InputStream.Close()`
and `OutputStream.Close()`, which will soon get some more
functionality.
Ignore all but the first call to `Close()`.
In the type comments, explain why these types don't implement
`io.Reader` and `io.Writer`. Otherwise, some helpful person is sure to
come along and add `Read()` and `Write()` methods, to the detriment of
performance and even changing some semantics.
Change the types of some `Pipeline` fields:

* `stdin` to `InputStream`
* `stdout` to `OutputStream`

That way we don't have to manage their closers separately.
@mhagger mhagger requested a review from a team as a code owner June 13, 2026 16:50
Copilot AI review requested due to automatic review settings June 13, 2026 16:50

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR evolves the stream ownership API by moving InputStream/OutputStream into dedicated types with idempotent Close() error, switching them to pointer semantics (including allowing nil stream pointers), and wiring these types through the Stage contract and Pipeline plumbing so stream closing behavior is consistently centralized in Close().

Changes:

  • Introduces pipe.InputStream / pipe.OutputStream pointer types with idempotent Close() error and nil-safe behavior.
  • Updates Stage.Start and all stage implementations/tests to accept *InputStream / *OutputStream and use Close() instead of Closer().
  • Refactors Pipeline to store stdin/stdout as stream types and pass them through stage wiring/abort paths.
Show a summary per file
File Description
pipe/streams.go Adds new pointer-based InputStream/OutputStream types with idempotent Close() error.
pipe/stage.go Updates Stage.Start signature and ownership docs/examples for stream pointers and Close().
pipe/pipeline.go Refactors pipeline stdin/stdout fields and stage wiring to use stream types throughout.
pipe/pipeline_test.go Updates test stages to use pointer stream parameters and Close() error.
pipe/pipe_matching_test.go Updates pipe sniffing test stage to the new stream pointer API.
pipe/function.go Updates Go-function stage wiring to close via stream Close() and propagate close errors.
pipe/env_stage.go Updates env-wrapping stage to the new stream pointer API.
pipe/command.go Updates command stage to use stream pointers and rely on stream Close() for ownership.
pipe/command_stdout_fastpath_test.go Adjusts fastpath tests to construct pointer-based output streams.
pipe/close_responsibility_test.go Updates close ownership tests to validate idempotent close behavior and counts.

Copilot's findings

Tip

Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comments suppressed due to low confidence (1)

pipe/command.go:122

  • closeEarlyClosers ignores errors from Close(). With streams now returning errors (and Wait() already propagating late-closer errors), this silently drops failures when closing our copy of stdin/stdout after cmd.Start(). Consider capturing the first close error and surfacing it from Wait() when cmd.Wait() succeeds, similar to how closeLateClosers() is handled.
	closeEarlyClosers := func() {
		for _, closer := range earlyClosers {
			_ = closer.Close()
		}
	}
  • Files reviewed: 10/10 changed files
  • Comments generated: 4

Comment thread pipe/streams.go
type InputStream struct {
reader io.Reader

// once is used to ensure that `Close() is only called once.
Comment thread pipe/streams.go
type OutputStream struct {
writer io.Writer

// once is used to ensure that `Close() is only called once.
Comment thread pipe/stage.go
Comment on lines +47 to 50
// cmd.Stdin = stdin.Reader() // Similarly for stdout
// cmd.Start(…)
// f.Close() // close our copy
// stdin.Close() // Close our copy
// cmd.Wait()
Comment thread pipe/stage.go
Comment on lines +59 to +62
// cmd.Stdin = stdin.Reader() // Similarly for stdout
// cmd.Start(…)
// cmd.Wait()
// r.Close()
// stdin.Close() // Close
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants