diff options
| author | JP Appel <jeanpierre.appel01@gmail.com> | 2025-07-22 15:41:03 -0400 |
|---|---|---|
| committer | JP Appel <jeanpierre.appel01@gmail.com> | 2025-07-22 15:41:03 -0400 |
| commit | 344c6526a8d6f490fc7628ddc7d2dd06ed1a07c1 (patch) | |
| tree | 342878ff5d77b557533d6e5473e1d8f6e79ac6e9 /pkg | |
| parent | faf35ef54885bc48b897508ce3cb40b868ff505b (diff) | |
Separate program entry point from commands
Diffstat (limited to 'pkg')
| -rw-r--r-- | pkg/data/update.go | 2 | ||||
| -rw-r--r-- | pkg/index/filters.go | 4 | ||||
| -rw-r--r-- | pkg/index/index.go | 9 | ||||
| -rw-r--r-- | pkg/query/errors.go | 2 | ||||
| -rw-r--r-- | pkg/query/parser.go | 5 | ||||
| -rw-r--r-- | pkg/server/server.go | 8 | ||||
| -rw-r--r-- | pkg/server/unix.go | 89 |
7 files changed, 111 insertions, 8 deletions
diff --git a/pkg/data/update.go b/pkg/data/update.go index a8a4b3d..1a50563 100644 --- a/pkg/data/update.go +++ b/pkg/data/update.go @@ -201,6 +201,7 @@ func (u *UpdateMany) documents() (bool, error) { SELECT path FROM temp.updateDocs )`) if err != nil { + slog.Debug("Failed to remove missing files from index") return false, err } @@ -215,6 +216,7 @@ func (u *UpdateMany) documents() (bool, error) { WHERE excluded.fileTime > Documents.fileTime `) if err != nil { + slog.Debug("Failed document upsert") return false, err } diff --git a/pkg/index/filters.go b/pkg/index/filters.go index 920d5df..bd9df54 100644 --- a/pkg/index/filters.go +++ b/pkg/index/filters.go @@ -50,10 +50,10 @@ func ParseFilter(s string) (DocFilter, error) { } return NewMaxFilesizeFilter(size), nil case "ExcludeName", "ExcludeFilename": - // FIXME: support escaped commas + // TODO: support escaped commas return NewExcludeFilenameFilter(strings.Split(param, ",")), nil case "IncludeName", "IncludeFilename": - // FIXME: support escaped commas + // TODO: support escaped commas return NewIncludeFilenameFilter(strings.Split(param, ",")), nil case "ExcludeParent": return NewExcludeParentFilter(param), nil diff --git a/pkg/index/index.go b/pkg/index/index.go index cfa4138..50f642e 100644 --- a/pkg/index/index.go +++ b/pkg/index/index.go @@ -12,6 +12,7 @@ import ( "slices" "strings" "sync" + "sync/atomic" "time" "github.com/goccy/go-yaml" @@ -383,21 +384,23 @@ func ParseDoc(path string, opts ParseOpts) (*Document, error) { return doc, nil } -func ParseDocs(paths []string, numWorkers uint, opts ParseOpts) map[string]*Document { +func ParseDocs(paths []string, numWorkers uint, opts ParseOpts) (map[string]*Document, uint64) { jobs := make(chan string, numWorkers) results := make(chan *Document, numWorkers) docs := make(map[string]*Document, len(paths)) wg := &sync.WaitGroup{} + errCnt := &atomic.Uint64{} wg.Add(int(numWorkers)) for range numWorkers { go func(jobs <-chan string, results chan<- *Document, wg *sync.WaitGroup) { for path := range jobs { doc, err := ParseDoc(path, opts) if err != nil { - slog.Error("Error occured while parsing file", + slog.Warn("Error occured while parsing file", slog.String("path", path), slog.String("err", err.Error()), ) + errCnt.Add(1) continue } @@ -423,7 +426,7 @@ func ParseDocs(paths []string, numWorkers uint, opts ParseOpts) map[string]*Docu docs[doc.Path] = doc } - return docs + return docs, errCnt.Load() } func init() { diff --git a/pkg/query/errors.go b/pkg/query/errors.go index 35f8c19..889d40d 100644 --- a/pkg/query/errors.go +++ b/pkg/query/errors.go @@ -6,7 +6,7 @@ import ( ) var ErrQueryFormat = errors.New("Incorrect query format") -var ErrDatetimeTokenParse = errors.New("Unrecognized format for datetime token") +var ErrDatetimeTokenParse = errors.New("Unrecognized format for datetime") // output errors var ErrUnrecognizedOutputToken = errors.New("Unrecognized output token") diff --git a/pkg/query/parser.go b/pkg/query/parser.go index e22b4c3..53681f6 100644 --- a/pkg/query/parser.go +++ b/pkg/query/parser.go @@ -549,7 +549,10 @@ func Parse(tokens []Token) (*Clause, error) { var t time.Time var err error if t, err = util.ParseDateTime(token.Value); err != nil { - return nil, ErrDatetimeTokenParse + return nil, fmt.Errorf("Cannot parse time `%s`, %v", + token.Value, + ErrDatetimeTokenParse, + ) } clause.Statements[len(clause.Statements)-1].Value = DatetimeValue{t} diff --git a/pkg/server/server.go b/pkg/server/server.go index c08d5a4..a7a5395 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -2,6 +2,7 @@ package server import ( "bytes" + "context" "io" "log/slog" "net/http" @@ -14,6 +15,11 @@ import ( "github.com/jpappel/atlas/pkg/query" ) +type Server interface { + ListenAndServe() error + Shutdown(context.Context) error +} + func info(w http.ResponseWriter, r *http.Request) { w.Write([]byte(` <h1>Atlas Server</h1> @@ -22,7 +28,7 @@ func info(w http.ResponseWriter, r *http.Request) { `)) } -func New(db *data.Query) *http.ServeMux { +func NewMux(db *data.Query) *http.ServeMux { mux := http.NewServeMux() outputBufPool := &sync.Pool{} diff --git a/pkg/server/unix.go b/pkg/server/unix.go new file mode 100644 index 0000000..87b425e --- /dev/null +++ b/pkg/server/unix.go @@ -0,0 +1,89 @@ +package server + +import ( + "bytes" + "context" + "fmt" + "log/slog" + "net" + "os" + "sync" + + "github.com/jpappel/atlas/pkg/data" +) + +// datagram based unix server +type UnixServer struct { + Addr string + Db *data.Query + shouldClose chan struct{} + bufPool *sync.Pool +} + +func (s *UnixServer) ListenAndServe() error { + slog.Info("Listening on", slog.String("addr", s.Addr)) + conn, err := net.ListenUnixgram( + "unixgram", + &net.UnixAddr{Name: s.Addr, Net: "Unix"}, + ) + if err != nil { + return err + } + defer conn.Close() + defer os.RemoveAll(s.Addr) + slog.Debug("Accepted connection") + + // slog.Info("New Connection", + // slog.String("addr", conn.RemoteAddr().String()), + // ) + + s.bufPool = &sync.Pool{} + s.bufPool.New = func() any { + return &bytes.Buffer{} + } + s.handleConn(conn) + + return nil +} + +func (s UnixServer) handleConn(conn *net.UnixConn) { + // buf, ok := s.bufPool.Get().(*bytes.Buffer) + // if !ok { + // panic("Expected *bytes.Buffer in pool") + // } + buf := make([]byte, 1024) + + n, err := conn.Read(buf) + if err != nil { + panic(err) + } + buf = buf[:n] + + fmt.Println(string(buf)) + + // if _, err := io.Copy(buf, conn); err != nil { + // panic(err) + // } + // defer buf.Reset() + // + // io.Copy(os.Stdout, buf) + + // artifact, err := query.Compile(buf.String(), 0, 1) + // if err != nil { + // panic(err) + // } + // + // docs, err := s.Db.Execute(artifact) + // if err != nil { + // panic(err) + // } + // + // _, err = query.DefaultOutput{}.OutputTo(conn, slices.Collect(maps.Values(docs))) + // if err != nil { + // panic(err) + // } +} + +func (s *UnixServer) Shutdown(ctx context.Context) error { + return nil +} |
