Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@ name: Build
on:
push:
tags: ['v*']
pull_request:
branches: [main]


jobs:

build:
strategy:
matrix:
Expand Down
39 changes: 39 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
name: Test & Build

on:
pull_request:
branches: [main, dev]

jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
go-version: '1.26'
cache: true

- name: Run tests
run: go test ./... -v
build:
strategy:
matrix:
arch: [amd64, arm64]
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- uses: actions/setup-go@v5
with:
go-version: '1.26'
cache: true

- name: Build yatund
run: |
GOOS=linux GOARCH=${{ matrix.arch }} CGO_ENABLED=0 go build -o build/yatund-${{ matrix.arch }} ./cmd/yatund/

- name: Build yatun
run: |
GOOS=linux GOARCH=${{ matrix.arch }} CGO_ENABLED=0 go build -o build/yatun-${{ matrix.arch }} ./cmd/yatun/

4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
.env
/yatun
/yatund
*.exe

4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

```
┌──────────┐ TCP ┌────────────────┐ yamux ┌──────────┐ TCP ┌─────────┐
│ Internet │───────────▶│ yatund │◀══════════▶│ yatun │────────▶│ local │
│ Client │ random port│ (relay server) │ session │ (agent) │ :port │ service │
│ Internet │───────────▶│ yatund │◀═════════▶│ yatun │────────▶│ local │
│ Client │ random port│ (relay server)│ session │ (agent) │ :port │ service │
└──────────┘ └────────────────┘ └──────────┘ └─────────┘
```

Expand Down
198 changes: 131 additions & 67 deletions cmd/yatun/main.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package main

import (
"errors"
"context"
"flag"
"fmt"
"io"

"os"
"sync/atomic"

"sync"
"time"

Expand All @@ -18,34 +19,51 @@ import (
"github.com/hashicorp/yamux"
)

func sendMsg(con *yamux.Stream, m message.TransportMessage) {
byt := m.Encode()
func sendMsg(con *yamux.Stream, m message.TransportMessage) (err error) {
byt, err := m.Encode()
if err != nil {

return fmt.Errorf("failed to encode message: %w", err)
}

_, err = con.Write(byt)
return

con.Write(byt)
}

func clientServerComms(ses *yamux.Session, tuiP *tea.Program) {
func clientServerComms(ses *yamux.Session, tuiP *tea.Program) (err error) {
con, err := ses.OpenStream()
if err != nil {
tuiP.Kill()
panic(errors.New("failed to accept new stream, is the server running?"))
}

dat := message.ConnectionDetailsMessageData{
SubdomainName: "asd",
tuiP.Send(tui.SetState{
State: tui.ErrorState,
Err: err,
})
tuiP.Quit()
return
}

sendMsg(con, message.TransportMessage{
Type: message.ConnectionDetails,
Data: &dat,
err = sendMsg(con, message.TransportMessage{
Type: message.OpenMsg,
})
if err != nil {
tuiP.Send(tui.SetState{
State: tui.ErrorState,
Err: fmt.Errorf("failed to send initial message to server, is the server running?\n%v", err),
})
tuiP.Quit()
return err
}

go func() {
for {
msg, err := message.Decode(con)
if err != nil {
tuiP.Kill()
panic(errors.New("the server closed unexpectedly"))
tuiP.Send(tui.SetState{
State: tui.ErrorState,
Err: fmt.Errorf("failed at decoding server message\n%v", err),
})
tuiP.Quit()
return
}

switch msg.Type {
Expand All @@ -60,6 +78,7 @@ func clientServerComms(ses *yamux.Session, tuiP *tea.Program) {
}
}
}()
return nil
}

func initializeServerConnection(tuiP *tea.Program, server string) (sess *yamux.Session, err error) {
Expand All @@ -74,7 +93,7 @@ func initializeServerConnection(tuiP *tea.Program, server string) (sess *yamux.S
return
}

clientServerComms(sess, tuiP)
err = clientServerComms(sess, tuiP)

return
}
Expand All @@ -83,82 +102,121 @@ type trafficMonitor struct {
underlying io.ReadWriter
tuiP *tea.Program
streamType tui.TrafficDirection

bytesTransferred *atomic.Int64
}

func (c trafficMonitor) Read(p []byte) (n int, err error) {
n, err = c.underlying.Read(p)
go c.tuiP.Send(tui.TrafficUpdate{
Direction: c.streamType,
Bytes: n,
})
c.bytesTransferred.Add(int64(n))
return
}
func (c trafficMonitor) Write(p []byte) (n int, err error) {
return c.underlying.Write(p)

}

func serverConnectionLoop(sess *yamux.Session, port *string, tuiP *tea.Program) {
// TODO: After initial handshake is done, io.Copy from server (yatun) to internal target server
for {
func handleStream(ctx context.Context, stream *yamux.Stream, port *string, tuiP *tea.Program) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

stream, err := sess.AcceptStream()
if err != nil {
// TODO: Maybe? send a message to the TUI so that the user knows it is having trouble getting new sessions from server
tuiP.Send(tui.SetState{
Err: err,
State: tui.ErrorState,
})
return
}
tuiP.Send(tui.LiveConnection)
defer tuiP.Send(tui.DeadConnection)
defer stream.Close()

go func() {
tuiP.Send(tui.LiveConnection)
defer tuiP.Send(tui.DeadConnection)
defer stream.Close()
localConn, err := net.DialTimeout("tcp", fmt.Sprintf("localhost:%v", *port), time.Second*10)
if err != nil {
tuiP.Send(tui.LocalConnectionError)
// Notify to the TUI the error, maybe the server is down?
return
}
defer localConn.Close()

localConn, err := net.DialTimeout("tcp", fmt.Sprintf("localhost:%v", *port), time.Second*10)
if err != nil {
tuiP.Send(tui.LocalConnectionError)
// Notify to the TUI the error, maybe the server is down?
go func() {
<-ctx.Done()

localConn.Close()
stream.Close()
}()

streamMonitor := trafficMonitor{
underlying: stream,
tuiP: tuiP,
streamType: tui.Inbound,
bytesTransferred: &atomic.Int64{},
}
localConnMonitor := trafficMonitor{
underlying: localConn,
tuiP: tuiP,
streamType: tui.Outbound,
bytesTransferred: &atomic.Int64{},
}

go func() {
t := time.NewTicker(time.Second * 5)
defer t.Stop()

for {
select {
case <-t.C:
// The TUI already sums the data internally, so the right call is Swap instead of load, this could also be used to measure throughput
tuiP.Send(tui.TrafficUpdate{
Direction: streamMonitor.streamType,
Bytes: int(streamMonitor.bytesTransferred.Swap(0)),
})
tuiP.Send(tui.TrafficUpdate{
Direction: localConnMonitor.streamType,
Bytes: int(localConnMonitor.bytesTransferred.Swap(0)),
})

case <-ctx.Done():
return
}
defer localConn.Close()
}
}()

streamCopier := trafficMonitor{
underlying: stream,
tuiP: tuiP,
streamType: tui.Inbound,
}
localConnCopier := trafficMonitor{
underlying: localConn,
tuiP: tuiP,
streamType: tui.Outbound,
}
wg := sync.WaitGroup{}

wg := sync.WaitGroup{}
wg.Go(func() {

wg.Go(func() {
io.Copy(streamMonitor, localConnMonitor)
cancel()

io.Copy(streamCopier, localConnCopier)
localConn.Close()
})

})
wg.Go(func() {
io.Copy(localConnMonitor, streamMonitor)
cancel()

wg.Go(func() {
io.Copy(localConnCopier, streamCopier)
stream.Close()
})

})
wg.Wait()
}

wg.Wait()
func serverConnectionLoop(ctx context.Context, sess *yamux.Session, port *string, tuiP *tea.Program) {
// TODO: After initial handshake is done, io.Copy from server (yatun) to internal target server
for {

}()
stream, err := sess.AcceptStreamWithContext(ctx)
if err != nil {
// TODO: Maybe? send a message to the TUI so that the user knows it is having trouble getting new sessions from server
tuiP.Send(tui.SetState{
Err: err,
State: tui.ErrorState,
})
tuiP.Quit()
return
}

go handleStream(ctx, stream, port, tuiP)
}

}

func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

port := flag.String("port", "", "--port")
server := flag.String("server", "yatun.snowdev.one", "--server")

Expand All @@ -175,20 +233,26 @@ func main() {
sess, err := initializeServerConnection(tuiP, *server)
if err != nil {
go tuiP.Send(tui.SetState{
Err: err,

Err: &tui.FailedInitialConfigError{
Err: err},
State: tui.ErrorState,
})
// tuiP.Quit()
}

if err == nil {
go tuiP.Send(tui.SetState{
State: tui.OnlineState,
})
go serverConnectionLoop(sess, port, tuiP)
go serverConnectionLoop(ctx, sess, port, tuiP)
}

if _, err := tuiP.Run(); err != nil {
sess.Close()
if sess != nil {
sess.Close()
}
cancel()
os.Exit(1)
}
}
Loading
Loading