From 3b84e41e8fec9ecf5c9f4885c8908cf94b8999f5 Mon Sep 17 00:00:00 2001 From: JP Appel Date: Wed, 23 Jul 2025 17:27:23 -0400 Subject: Implement basic server over unix datagram --- cmd/help.go | 2 +- pkg/query/outputs.go | 10 ++--- pkg/server/unix.go | 121 ++++++++++++++++++++++++++++----------------------- 3 files changed, 72 insertions(+), 61 deletions(-) diff --git a/cmd/help.go b/cmd/help.go index e57ce2f..9d59f53 100644 --- a/cmd/help.go +++ b/cmd/help.go @@ -104,7 +104,7 @@ func Help(topic string, w io.Writer) { shell.PrintHelp(w) case "server": SetupServerFlags(nil, fs, &ServerFlags{}) - fmt.Fprintf(w, "%s [global-flags] server [server-flags]", os.Args[0]) + fmt.Fprintf(w, "%s [global-flags] server [server-flags]\n", os.Args[0]) fmt.Fprintln(w, "Run a server to execute queries over HTTP or a unix domain socket") fmt.Fprintln(w, "HTTP Server:") fmt.Fprintln(w, " To execute a query POST it in the request body to /search") diff --git a/pkg/query/outputs.go b/pkg/query/outputs.go index 50a6c4d..1fb7214 100644 --- a/pkg/query/outputs.go +++ b/pkg/query/outputs.go @@ -54,20 +54,20 @@ var _ Outputer = &CustomOutput{} // and a nil error func (o DefaultOutput) OutputOne(doc *index.Document) (string, error) { b := strings.Builder{} - o.writeDoc(&b, doc) + o.WriteDoc(&b, doc) return b.String(), nil } func (o DefaultOutput) OutputOneTo(w io.Writer, doc *index.Document) (int, error) { - return o.writeDoc(w, doc) + return o.WriteDoc(w, doc) } func (o DefaultOutput) Output(docs []*index.Document) (string, error) { b := strings.Builder{} for i, doc := range docs { - o.writeDoc(&b, doc) + o.WriteDoc(&b, doc) if i != len(docs)-1 { b.WriteRune('\n') } @@ -79,7 +79,7 @@ func (o DefaultOutput) Output(docs []*index.Document) (string, error) { func (o DefaultOutput) OutputTo(w io.Writer, docs []*index.Document) (int, error) { n := 0 for _, doc := range docs { - nn, err := o.writeDoc(w, doc) + nn, err := o.WriteDoc(w, doc) if err != nil { return n, err } @@ -90,7 +90,7 @@ func (o DefaultOutput) OutputTo(w io.Writer, docs []*index.Document) (int, error return n, nil } -func (o DefaultOutput) writeDoc(w io.Writer, doc *index.Document) (int, error) { +func (o DefaultOutput) WriteDoc(w io.Writer, doc *index.Document) (int, error) { var n int s := [][]byte{ []byte(doc.Path), diff --git a/pkg/server/unix.go b/pkg/server/unix.go index 87b425e..3b2ee9b 100644 --- a/pkg/server/unix.go +++ b/pkg/server/unix.go @@ -3,87 +3,98 @@ package server import ( "bytes" "context" - "fmt" "log/slog" "net" "os" - "sync" "github.com/jpappel/atlas/pkg/data" + "github.com/jpappel/atlas/pkg/query" ) // datagram based unix server type UnixServer struct { - Addr string - Db *data.Query - shouldClose chan struct{} - bufPool *sync.Pool + Addr string + Db *data.Query + conn *net.UnixConn } func (s *UnixServer) ListenAndServe() error { - slog.Info("Listening on", slog.String("addr", s.Addr)) - conn, err := net.ListenUnixgram( + serverAddr := s.Addr + "_server.sock" + clientAddr := s.Addr + "_client.sock" + + var err error + s.conn, err = net.ListenUnixgram( "unixgram", - &net.UnixAddr{Name: s.Addr, Net: "Unix"}, + &net.UnixAddr{Name: serverAddr, Net: "Unix"}, ) if err != nil { return err } - defer conn.Close() defer os.RemoveAll(s.Addr) - slog.Debug("Accepted connection") - - // slog.Info("New Connection", - // slog.String("addr", conn.RemoteAddr().String()), - // ) - - s.bufPool = &sync.Pool{} - s.bufPool.New = func() any { - return &bytes.Buffer{} - } - s.handleConn(conn) - - return nil -} - -func (s UnixServer) handleConn(conn *net.UnixConn) { - // buf, ok := s.bufPool.Get().(*bytes.Buffer) - // if !ok { - // panic("Expected *bytes.Buffer in pool") - // } - buf := make([]byte, 1024) + slog.Info("Listening on", slog.String("addr", s.Addr)) - n, err := conn.Read(buf) + var remote *net.UnixAddr + remote, err = net.ResolveUnixAddr("unixgram", clientAddr) if err != nil { panic(err) } - buf = buf[:n] + // FIXME: limits queries to 1kb, might have some data overflow into next msg + buf := make([]byte, 1024) + for { + n, _, err := s.conn.ReadFromUnix(buf) + if err != nil { + return err + } + buf = buf[:n] + queryTxt := string(buf) + slog.Debug("New message", + slog.String("msg", queryTxt), + slog.String("local", s.conn.LocalAddr().String()), + slog.String("remote", remote.String()), + ) + + // TODO: set reasonable numWorkers + // TODO: rwrite error to remote + artifact, err := query.Compile(queryTxt, 0, 2) + if err != nil { + slog.Error("Failed to compile query", slog.String("err", err.Error())) + return err + } - fmt.Println(string(buf)) + // TODO: write error to remote + docs, err := s.Db.Execute(artifact) + if err != nil { + slog.Error("Failed to execute query", + slog.String("err", err.Error())) + return err + } - // if _, err := io.Copy(buf, conn); err != nil { - // panic(err) - // } - // defer buf.Reset() - // - // io.Copy(os.Stdout, buf) + buf := &bytes.Buffer{} + o := query.DefaultOutput{} + for _, doc := range docs { + n, err = o.WriteDoc(buf, doc) + if err != nil { + return err + } - // artifact, err := query.Compile(buf.String(), 0, 1) - // if err != nil { - // panic(err) - // } - // - // docs, err := s.Db.Execute(artifact) - // if err != nil { - // panic(err) - // } - // - // _, err = query.DefaultOutput{}.OutputTo(conn, slices.Collect(maps.Values(docs))) - // if err != nil { - // panic(err) - // } + b := buf.Bytes() + remaining := len(b) + offset := 0 + for remaining > 0 { + n, err := s.conn.WriteToUnix(b[offset:remaining], remote) + if err != nil { + return err + } + remaining -= n + offset += n + } + buf.Reset() + } + // EOF + s.conn.WriteToUnix([]byte{4}, remote) + } } func (s *UnixServer) Shutdown(ctx context.Context) error { - return nil + return s.conn.Close() } -- cgit v1.2.3