Documentation
¶
Overview ¶
Package queue implements a few types of queues. Notably an asynchronous queue for non-blocking data processing and a SQLite3 queue.
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrEmpty = errors.New("queue: queue is empty")
ErrEmpty is returned when dequeuing from an empty queue.
Functions ¶
This section is empty.
Types ¶
type AsyncQueue ¶
type AsyncQueue interface {
// Add data to the queue. Safe for concurrent use.
Enqueue(data []byte) error
// Close the queue. Can be done at any point after the queue is
// constructed.
io.Closer
}
AsyncQueue processes queue data asynchronously.
Example ¶
An example of using the async queue to process data.
package main
import (
"fmt"
"log"
"time"
"github.com/esote/queue"
)
func main() {
sqlite3, err := queue.NewSqlite3Queue("test.db")
if err != nil {
log.Fatal(err)
}
defer sqlite3.Close()
handler := func(data []byte, err error) {
if err == queue.ErrEmpty {
// If the queue will usually be empty, here you can
// sleep or use exp-backoff to avoid busy-waiting. With
// more knowledge of enqueues its possible to sleep
// indefinitely and wake when data is added.
time.Sleep(100 * time.Millisecond)
return
}
if err != nil {
log.Println(err)
return
}
fmt.Println(string(data))
}
q, err := queue.NewAsyncQueue(sqlite3, handler, 2)
if err != nil {
log.Fatal(err)
}
defer q.Close()
msgs := []string{"hi", "hello", "hey", "hiya"}
for _, msg := range msgs {
if err = q.Enqueue([]byte(msg)); err != nil {
log.Fatal(err)
}
}
// The messages will be dequeued "eventually". Closing the async queue
// does not block until the queue is empty, it only blocks long enough
// to ensure all data is accounted for (either stored in the queue or
// done being used by a handler). However, for the purpose of this
// example we wait to give the async queue time to process its data.
time.Sleep(500 * time.Millisecond)
}
Output: hi hello hey hiya
func NewAsyncQueue ¶
func NewAsyncQueue(q Queue, handler Handler, workers int) (AsyncQueue, error)
NewAsyncQueue creates an async queue that processes inner queue data through a handler and worker pool. Closing the async queue does NOT close the inner queue.
type Handler ¶
Handler operates on data from the async queue. Data and err come directly from the inner queue's dequeue operation, and so the handler should take care to handle non-nil errors, including ErrEmpty.
type Queue ¶
type Queue interface {
// Add data to the queue. Safe for concurrent use.
Enqueue(data []byte) error
// Remove data from the queue. Safe for concurrent use. Returns ErrEmpty
// if the queue contains no data.
Dequeue() ([]byte, error)
// Close the queue.
io.Closer
}
Queue contains data.
func NewSqlite3Queue ¶
NewSqlite3Queue creates an SQLite3-backed queue with ACID properties.