Golang creating a worker thread pool using maps

July 24, 2016

This example will show you how to create a worker thread that waits for jobs and working them one by one. We use maps to create a list where we add jobs too. This tutorial will process strings and create bcrypt hashes of it, for simulating purposes.

The example

In this example we use a non standard library which will create the hashes of the strings.

go get golang.org/x/crypto/bcrypt

The whole code is provided below, the basics are explained in the comments.

package main
import (
	"fmt"
	"sync"
	"time"
	"golang.org/x/crypto/bcrypt"
)
type WorkThread struct {
	Todo    map[int]string
	LastKey int
	KeyLock sync.Mutex
	MapLock sync.Mutex
}
func (w *WorkThread) Start() {
	fmt.Println("Start worker")
	//This thread will wait for work
	for {
		//Check if there is a job to do
		if len(w.Todo) == 0 {
			//Nothing to do, wait for work
			fmt.Println("No jobs, waiting...")
			time.Sleep(2 * time.Second)
			continue //Move back to the beginning of the loop
		}
		//We have work to do, lets get the key of the job
		JobKey := w.GetNextKey()
		//Generate a hash with the key that holds the string,
		HashedString, _ := bcrypt.GenerateFromPassword([]byte(w.Todo[JobKey]), 15)
		fmt.Println("Job completed: " + string(HashedString))
		//Remove this string from the job queue
		w.RemoveFromQue(JobKey)
	}
}
func (w *WorkThread) Newkey() int {
	//This is to get a new key for the map
	w.KeyLock.Lock()         //Lock the LastKey integer
	defer w.KeyLock.Unlock() //Tell go to call this function when NewKey() returns
	w.LastKey++              //Increment the last used key by one to add to our map
	return w.LastKey
}
func (w *WorkThread) AddToQueue(Newstring string) {
	//Add job to the qeue
	NewKey := w.Newkey()       //First get a new key
	w.KeyLock.Lock()           //Lock the map, make it safe to write too
	defer w.KeyLock.Unlock()   //Unlock the map after function return
	w.Todo[NewKey] = Newstring //Add the string with the new key to the map
	fmt.Println("Job added!")
	return
}
func (w *WorkThread) RemoveFromQue(DeletKey int) {
	w.MapLock.Lock()         //Lock the map
	defer w.MapLock.Unlock() //Call this function on return
	delete(w.Todo, DeletKey) //Remove the job from queue
	return
}
func (w *WorkThread) GetNextKey() int {
	var ReturnKey int
	//Get the first key in the map
	for key, _ := range w.Todo {
		ReturnKey = key
		break
	}
	return ReturnKey
}
//Make the workthread globally accessible
var Workpool WorkThread
func main() {
	//Initialize the work thread
	Workpool = *new(WorkThread)
	Workpool.Todo = make(map[int]string)
	Workpool.LastKey = 0
	go Workpool.Start() //Start the work pool
	//Wait and add items to the workpool
	//Wait and add items to the workpool, simulating
	time.Sleep(3 * time.Second)
	Workpool.AddToQueue("Hash me")
	Workpool.AddToQueue("And me too")
	Workpool.AddToQueue("Don't forget to hash this one")
	time.Sleep(5 * time.Second)
	Workpool.AddToQueue("More work!")
	Workpool.AddToQueue("Not over yet")
	time.Sleep(4 * time.Second)
	Workpool.AddToQueue("Almost there")
	Workpool.AddToQueue("Last one")
	//Make sure main doesn't return before the work is done
	for {
		time.Sleep(time.Second)
	}
}

Result

Start worker
No jobs, waiting...
No jobs, waiting...
Job added!
Job added!
Job added!
Job completed: $2a$15$RaZgzPXVexXxTexzUcjLl.GmVJyMZ.Tn82mVV86hLLUIcltXJcOnW
Job added!
Job added!
Job completed: $2a$15$.zcrKcmAYR/F1FV4upXl..ksZ3HM0sqkWVVPyrDSRU9zi2831GaZS
Job added!
Job added!
Job completed: $2a$15$aje7wLgUIBCeT1pWSe68eeJpIhTEjvGETDDYkDi2qJMPGWr4UELJW
Job completed: $2a$15$/S8q48lFUk9O2eHMEqNuhuti14d6pzi0.5nrhc8WumVQZnzrsQf6q
Job completed: $2a$15$8w9VNgX2W49NZR8DIaG67OSM1CpEHVB9zUGbHGYbCMhNWCU1RMo4i
Job completed: $2a$15$mT4RQuwHGbVYm7jH4FUVGO13ztVjqk3O.W9uSWGoDCqloikNagWCG
Job completed: $2a$15$.WQlnGMr16ezpHny6VkqF.byJlpURzU6hD4wJy1PNeZAVXtaLp3Eq
No jobs, waiting...
No jobs, waiting...
No jobs, waiting...
...

Notes

This example doesn't have much meaning just hashing strings, you can use your own type in maps but not as key, only as value, but we use strings here as a stub.

The main function first starts the thread that will do the work, using my custom type WorkThread. The functions are self explaining, we use mutex locks to prevent data races, this is important when separate threads access the same data.

We make the WorkThread globally accessible so we can add work to the queue anywhere in the code.

If you are going to use big data types I advice you to use pointers, which cause less overhead and eliminates value copying.

Also note that once the highest value of int is reached the program will overflow. So adding some sort of reset for "LastKey" should be in place.

Read also

Implementing a TCP Client and Server in Go: A Ping Pong Example
Golang human readable byte sizes
Golang transfer a file over a TCP socket
Generate CRC32 hash of a file in Golang turorial
Generate SHA1 hash of a file in Golang example
Generate MD5 hash of a file in Golang
Comments
Tags