Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var WarpPipeCmd = &cobra.Command{ Use: "warp-pipe", Short: "Run a warp-pipe", Long: `Run a warp-pipe and stream changes from a Postgres database.`, RunE: func(cmd *cobra.Command, _ []string) error { config, err := parseConfig() if err != nil { return err } listener, err := initListener(config) if err != nil { return err } wp := warppipe.NewWarpPipe( listener, warppipe.IgnoreTables(config.IgnoreTables), warppipe.WhitelistTables(config.WhitelistTables), warppipe.LogLevel(config.LogLevel), ) if err := wp.Open(&config.Database); err != nil { log.Fatal(err) } ctx, cancel := context.WithCancel(context.Background()) changes, errors := wp.ListenForChanges(ctx) go func() { for { select { case change := <-changes: b, err := json.Marshal(change) if err != nil { log.Error(err) } fmt.Println(string(b)) case err := <-errors: log.Error(err) } } }() shutdownCh := make(chan os.Signal, 1) signal.Notify(shutdownCh, os.Interrupt, syscall.SIGTERM) for { <-shutdownCh cancel() if err := wp.Close(); err != nil { return err } return nil } }, }
WarpPipeCmd is the root command.
Functions ¶
This section is empty.
Types ¶
This section is empty.
Click to show internal directories.
Click to hide internal directories.