Merge pull request #3588 from katiewasnothere/exposeEventPublisher
Export shim publisher functions
This commit is contained in:
commit
0293cbd26c
@ -41,13 +41,13 @@ type item struct {
|
|||||||
count int
|
count int
|
||||||
}
|
}
|
||||||
|
|
||||||
func newPublisher(address string) (*remoteEventsPublisher, error) {
|
func NewPublisher(address string) (*RemoteEventsPublisher, error) {
|
||||||
client, err := ttrpcutil.NewClient(address)
|
client, err := ttrpcutil.NewClient(address)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
l := &remoteEventsPublisher{
|
l := &RemoteEventsPublisher{
|
||||||
client: client,
|
client: client,
|
||||||
closed: make(chan struct{}),
|
closed: make(chan struct{}),
|
||||||
requeue: make(chan *item, queueSize),
|
requeue: make(chan *item, queueSize),
|
||||||
@ -57,18 +57,18 @@ func newPublisher(address string) (*remoteEventsPublisher, error) {
|
|||||||
return l, nil
|
return l, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type remoteEventsPublisher struct {
|
type RemoteEventsPublisher struct {
|
||||||
client *ttrpcutil.Client
|
client *ttrpcutil.Client
|
||||||
closed chan struct{}
|
closed chan struct{}
|
||||||
closer sync.Once
|
closer sync.Once
|
||||||
requeue chan *item
|
requeue chan *item
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *remoteEventsPublisher) Done() <-chan struct{} {
|
func (l *RemoteEventsPublisher) Done() <-chan struct{} {
|
||||||
return l.closed
|
return l.closed
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *remoteEventsPublisher) Close() (err error) {
|
func (l *RemoteEventsPublisher) Close() (err error) {
|
||||||
err = l.client.Close()
|
err = l.client.Close()
|
||||||
l.closer.Do(func() {
|
l.closer.Do(func() {
|
||||||
close(l.closed)
|
close(l.closed)
|
||||||
@ -76,7 +76,7 @@ func (l *remoteEventsPublisher) Close() (err error) {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *remoteEventsPublisher) processQueue() {
|
func (l *RemoteEventsPublisher) processQueue() {
|
||||||
for i := range l.requeue {
|
for i := range l.requeue {
|
||||||
if i.count > maxRequeue {
|
if i.count > maxRequeue {
|
||||||
logrus.Errorf("evicting %s from queue because of retry count", i.ev.Topic)
|
logrus.Errorf("evicting %s from queue because of retry count", i.ev.Topic)
|
||||||
@ -91,7 +91,7 @@ func (l *remoteEventsPublisher) processQueue() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *remoteEventsPublisher) queue(i *item) {
|
func (l *RemoteEventsPublisher) queue(i *item) {
|
||||||
go func() {
|
go func() {
|
||||||
i.count++
|
i.count++
|
||||||
// re-queue after a short delay
|
// re-queue after a short delay
|
||||||
@ -100,7 +100,7 @@ func (l *remoteEventsPublisher) queue(i *item) {
|
|||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error {
|
func (l *RemoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error {
|
||||||
ns, err := namespaces.NamespaceRequired(ctx)
|
ns, err := namespaces.NamespaceRequired(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -127,7 +127,7 @@ func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *remoteEventsPublisher) forwardRequest(ctx context.Context, req *v1.ForwardRequest) error {
|
func (l *RemoteEventsPublisher) forwardRequest(ctx context.Context, req *v1.ForwardRequest) error {
|
||||||
_, err := l.client.EventsService().Forward(ctx, req)
|
_, err := l.client.EventsService().Forward(ctx, req)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return nil
|
return nil
|
||||||
|
@ -169,7 +169,7 @@ func run(id string, initFunc Init, config Config) error {
|
|||||||
|
|
||||||
ttrpcAddress := os.Getenv(ttrpcAddressEnv)
|
ttrpcAddress := os.Getenv(ttrpcAddressEnv)
|
||||||
|
|
||||||
publisher, err := newPublisher(ttrpcAddress)
|
publisher, err := NewPublisher(ttrpcAddress)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user