aboutsummaryrefslogtreecommitdiffstats
path: root/pkg/server/unix.go
diff options
context:
space:
mode:
authorJP Appel <jeanpierre.appel01@gmail.com>2025-07-23 17:27:23 -0400
committerJP Appel <jeanpierre.appel01@gmail.com>2025-07-23 17:27:23 -0400
commit3b84e41e8fec9ecf5c9f4885c8908cf94b8999f5 (patch)
tree98a09e3ec84ab54ca5533034bdb891f8366ddfc0 /pkg/server/unix.go
parent325b23a109527a8ff385889e47b5ea8a102e24db (diff)
Implement basic server over unix datagram
Diffstat (limited to 'pkg/server/unix.go')
-rw-r--r--pkg/server/unix.go121
1 files changed, 66 insertions, 55 deletions
diff --git a/pkg/server/unix.go b/pkg/server/unix.go
index 87b425e..3b2ee9b 100644
--- a/pkg/server/unix.go
+++ b/pkg/server/unix.go
@@ -3,87 +3,98 @@ package server
import (
"bytes"
"context"
- "fmt"
"log/slog"
"net"
"os"
- "sync"
"github.com/jpappel/atlas/pkg/data"
+ "github.com/jpappel/atlas/pkg/query"
)
// datagram based unix server
type UnixServer struct {
- Addr string
- Db *data.Query
- shouldClose chan struct{}
- bufPool *sync.Pool
+ Addr string
+ Db *data.Query
+ conn *net.UnixConn
}
func (s *UnixServer) ListenAndServe() error {
- slog.Info("Listening on", slog.String("addr", s.Addr))
- conn, err := net.ListenUnixgram(
+ serverAddr := s.Addr + "_server.sock"
+ clientAddr := s.Addr + "_client.sock"
+
+ var err error
+ s.conn, err = net.ListenUnixgram(
"unixgram",
- &net.UnixAddr{Name: s.Addr, Net: "Unix"},
+ &net.UnixAddr{Name: serverAddr, Net: "Unix"},
)
if err != nil {
return err
}
- defer conn.Close()
defer os.RemoveAll(s.Addr)
- slog.Debug("Accepted connection")
-
- // slog.Info("New Connection",
- // slog.String("addr", conn.RemoteAddr().String()),
- // )
-
- s.bufPool = &sync.Pool{}
- s.bufPool.New = func() any {
- return &bytes.Buffer{}
- }
- s.handleConn(conn)
-
- return nil
-}
-
-func (s UnixServer) handleConn(conn *net.UnixConn) {
- // buf, ok := s.bufPool.Get().(*bytes.Buffer)
- // if !ok {
- // panic("Expected *bytes.Buffer in pool")
- // }
- buf := make([]byte, 1024)
+ slog.Info("Listening on", slog.String("addr", s.Addr))
- n, err := conn.Read(buf)
+ var remote *net.UnixAddr
+ remote, err = net.ResolveUnixAddr("unixgram", clientAddr)
if err != nil {
panic(err)
}
- buf = buf[:n]
+ // FIXME: limits queries to 1kb, might have some data overflow into next msg
+ buf := make([]byte, 1024)
+ for {
+ n, _, err := s.conn.ReadFromUnix(buf)
+ if err != nil {
+ return err
+ }
+ buf = buf[:n]
+ queryTxt := string(buf)
+ slog.Debug("New message",
+ slog.String("msg", queryTxt),
+ slog.String("local", s.conn.LocalAddr().String()),
+ slog.String("remote", remote.String()),
+ )
+
+ // TODO: set reasonable numWorkers
+ // TODO: rwrite error to remote
+ artifact, err := query.Compile(queryTxt, 0, 2)
+ if err != nil {
+ slog.Error("Failed to compile query", slog.String("err", err.Error()))
+ return err
+ }
- fmt.Println(string(buf))
+ // TODO: write error to remote
+ docs, err := s.Db.Execute(artifact)
+ if err != nil {
+ slog.Error("Failed to execute query",
+ slog.String("err", err.Error()))
+ return err
+ }
- // if _, err := io.Copy(buf, conn); err != nil {
- // panic(err)
- // }
- // defer buf.Reset()
- //
- // io.Copy(os.Stdout, buf)
+ buf := &bytes.Buffer{}
+ o := query.DefaultOutput{}
+ for _, doc := range docs {
+ n, err = o.WriteDoc(buf, doc)
+ if err != nil {
+ return err
+ }
- // artifact, err := query.Compile(buf.String(), 0, 1)
- // if err != nil {
- // panic(err)
- // }
- //
- // docs, err := s.Db.Execute(artifact)
- // if err != nil {
- // panic(err)
- // }
- //
- // _, err = query.DefaultOutput{}.OutputTo(conn, slices.Collect(maps.Values(docs)))
- // if err != nil {
- // panic(err)
- // }
+ b := buf.Bytes()
+ remaining := len(b)
+ offset := 0
+ for remaining > 0 {
+ n, err := s.conn.WriteToUnix(b[offset:remaining], remote)
+ if err != nil {
+ return err
+ }
+ remaining -= n
+ offset += n
+ }
+ buf.Reset()
+ }
+ // EOF
+ s.conn.WriteToUnix([]byte{4}, remote)
+ }
}
func (s *UnixServer) Shutdown(ctx context.Context) error {
- return nil
+ return s.conn.Close()
}