queue

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 23, 2020 License: AGPL-3.0 Imports: 7 Imported by: 0

README

Package queue implements a few types of queues. Notably an asynchronous queue
for non-blocking data processing and a SQLite3 queue.

Package httpq is a specialization of the async queue for HTTP requests. It also
demonstrates handling ErrEmpty in an async queue through conditional variables
rather than busy waiting.

Documentation

Overview

Package queue implements a few types of queues. Notably an asynchronous queue for non-blocking data processing and a SQLite3 queue.

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrEmpty = errors.New("queue: queue is empty")

ErrEmpty is returned when dequeuing from an empty queue.

Functions

This section is empty.

Types

type AsyncQueue

type AsyncQueue interface {
	// Add data to the queue. Safe for concurrent use.
	Enqueue(data []byte) error

	// Close the queue. Can be done at any point after the queue is
	// constructed.
	io.Closer
}

AsyncQueue processes queue data asynchronously.

Example

An example of using the async queue to process data.

package main

import (
	"fmt"
	"log"
	"time"

	"github.com/esote/queue"
)

func main() {
	sqlite3, err := queue.NewSqlite3Queue("test.db")
	if err != nil {
		log.Fatal(err)
	}
	defer sqlite3.Close()
	handler := func(data []byte, err error) {
		if err == queue.ErrEmpty {
			// If the queue will usually be empty, here you can
			// sleep or use exp-backoff to avoid busy-waiting. With
			// more knowledge of enqueues its possible to sleep
			// indefinitely and wake when data is added.
			time.Sleep(100 * time.Millisecond)
			return
		}
		if err != nil {
			log.Println(err)
			return
		}
		fmt.Println(string(data))
	}
	q, err := queue.NewAsyncQueue(sqlite3, handler, 2)
	if err != nil {
		log.Fatal(err)
	}
	defer q.Close()

	msgs := []string{"hi", "hello", "hey", "hiya"}
	for _, msg := range msgs {
		if err = q.Enqueue([]byte(msg)); err != nil {
			log.Fatal(err)
		}
	}

	// The messages will be dequeued "eventually". Closing the async queue
	// does not block until the queue is empty, it only blocks long enough
	// to ensure all data is accounted for (either stored in the queue or
	// done being used by a handler). However, for the purpose of this
	// example we wait to give the async queue time to process its data.
	time.Sleep(500 * time.Millisecond)
}
Output:

hi
hello
hey
hiya

func NewAsyncQueue

func NewAsyncQueue(q Queue, handler Handler, workers int) (AsyncQueue, error)

NewAsyncQueue creates an async queue that processes inner queue data through a handler and worker pool. Closing the async queue does NOT close the inner queue.

type Handler

type Handler func(data []byte, err error)

Handler operates on data from the async queue. Data and err come directly from the inner queue's dequeue operation, and so the handler should take care to handle non-nil errors, including ErrEmpty.

type Queue

type Queue interface {
	// Add data to the queue. Safe for concurrent use.
	Enqueue(data []byte) error

	// Remove data from the queue. Safe for concurrent use. Returns ErrEmpty
	// if the queue contains no data.
	Dequeue() ([]byte, error)

	// Close the queue.
	io.Closer
}

Queue contains data.

func NewMemoryQueue

func NewMemoryQueue() Queue

NewMemoryQueue creates an in-memory queue.

func NewSqlite3Queue

func NewSqlite3Queue(file string) (Queue, error)

NewSqlite3Queue creates an SQLite3-backed queue with ACID properties.

Directories

Path Synopsis
internal
pkg
httpq
Package httpq is an async queue for HTTP requests.
Package httpq is an async queue for HTTP requests.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL