Interview/Golang

Golang Channel

김 정출 2024. 10. 8. 22:55

Golang에서 CSP?

  • Go 언어의 채널(Channel)은 Goroutine 간에 안전하게 데이터를 주고받을 수 있는 메커니즘입니다. 채널은 동시성 프로그래밍에서 중요한 역할을 하며, Go 언어의 동시성 모델인 Communicating Sequential Processes (CSP)를 구현하는 데 사용됩니다.

Communicating Sequential Processe(CSP)

CSP (Communicating Sequential Processes)는 동시성을 다루기 위한 수학적 모델로, 1978년 영국의 컴퓨터 과학자 Tony Hoare에 의해 제안되었습니다. 이 모델은 여러 개의 독립적인 프로세스들이 서로 통신을 통해 협력하여 작업을 수행하는 방식으로 동시성을 표현합니다. 프로세스는 각기 독립적으로 실행되며, 통신은 메시지 전달을 통해 이루어집니다. Go의 Goroutine채널 시스템은 이 CSP 모델을 기반으로 설계되었습니다.

CSP (Communicating Sequential Processes)란?

CSP는 독립적인 프로세스들이 서로 메시지를 주고받으며 동작하는 방식을 정의한 모델입니다. 이 모델은 병렬 처리와 동시성 문제를 해결하기 위한 수학적 틀을 제공하며, 특히 프로세스 간의 상호작용과 통신을 명확하게 표현하는 데 중점을 둡니다.

CSP는 다음 두 가지 원칙에 의해 작동합니다:

  1. 프로세스는 독립적으로 동작: 각 프로세스는 자신만의 작업을 독립적으로 수행하며, 다른 프로세스와 직접적인 상태 공유를 하지 않습니다.
  2. 메시지 전달을 통한 통신: 프로세스들 간의 상호작용은 메시지 전달(message passing)을 통해 이루어집니다. 각 프로세스는 채널을 통해서만 다른 프로세스와 정보를 주고받을 수 있습니다.

이 CSP 모델은 잠금(lock) 없이 동시성을 안전하게 관리할 수 있는 방법을 제공하며, 데이터 경쟁(data race) 문제를 최소화합니다.

CSP의 주요 개념

  1. 프로세스:
    • CSP에서 프로세스는 독립적으로 실행되는 작업 단위를 말합니다. 각 프로세스는 자신의 상태와 작업을 관리하며, 다른 프로세스의 상태에 직접 접근하지 않습니다.
    • 각 프로세스는 일련의 작업을 순차적으로 수행하지만, 동시에 여러 프로세스가 독립적으로 실행될 수 있습니다.
  2. 채널 (Channel):
    • 채널은 CSP에서 프로세스 간의 통신을 위한 주요 메커니즘입니다. 프로세스는 채널을 통해 메시지를 보내고 받을 수 있습니다.
    • CSP에서는 채널을 통해 동기적 통신을 사용합니다. 즉, 송신자가 메시지를 채널에 보내면 수신자가 이를 받을 때까지 대기하며, 수신자가 없으면 메시지는 즉시 전달되지 않습니다.
  3. 동기적 메시지 전달 (Synchronous Message Passing):
    • CSP에서 프로세스들은 동기적으로 메시지를 주고받습니다. 한 프로세스가 메시지를 보내면, 상대방 프로세스가 메시지를 받을 때까지 송신 프로세스는 대기합니다. 이는 블로킹 통신이라고도 하며, 이를 통해 데이터 전달의 순서를 명확하게 정의할 수 있습니다.
    • 송신자와 수신자가 모두 준비된 상태에서만 통신이 이루어지기 때문에, 데이터 경합이나 동기화 문제를 최소화할 수 있습니다.
  4. 프로세스 간 비공유 상태 (Non-shared State):
    • CSP의 핵심 원칙 중 하나는 프로세스 간에 직접적으로 상태를 공유하지 않는 것입니다. 공유 메모리를 통한 통신이 아닌, 메시지 전달을 통한 통신만 허용되므로, 프로세스 간의 데이터 경쟁이 일어나지 않습니다.
    • Go에서 Goroutine채널을 사용한 동시성 처리도 이 원칙을 따릅니다.

Go에서의 CSP 구현

Go의 동시성 모델은 CSP 개념을 고스란히 반영하여 설계되었습니다. Goroutine은 CSP에서 말하는 프로세스에 해당하고, 채널은 프로세스들 간의 통신 메커니즘을 담당합니다.

1. Goroutine과 프로세스

Go에서 Goroutine은 독립적으로 실행되는 함수입니다. Goroutine은 기본적으로 Go 런타임에서 관리되는 경량 스레드로서, CSP의 독립적인 프로세스 개념을 구현합니다.

  • 각 Goroutine은 독립적으로 실행되며, 다른 Goroutine의 상태에 직접 접근하지 않고, 채널을 통해서만 통신합니다.
  • 이를 통해 개발자는 잠금(lock)을 직접 관리하지 않고도 안전하게 동시성을 구현할 수 있습니다.

2. 채널과 메시지 전달

Go에서 채널(Channel)은 CSP의 메시지 전달 개념을 구현한 것입니다. Goroutine 간의 데이터는 채널을 통해서만 주고받으며, 이를 통해 동시성을 안전하게 관리합니다.

  • 동기적 통신: Go에서 기본적으로 채널을 통한 통신은 동기적입니다. 한 Goroutine이 채널에 데이터를 보내면, 다른 Goroutine이 그 데이터를 받을 때까지 송신자는 블로킹됩니다. 이 동기화는 CSP 모델의 동기적 메시지 전달 원칙을 따릅니다.
// 채널을 생성
ch := make(chan int)

// Goroutine에서 채널로 데이터 전송
go func() {
    ch <- 42  // 42라는 값을 채널에 보냄
}()

// 채널에서 데이터 수신
value := <-ch
fmt.Println(value)  // 출력: 42
  1. 비공유 상태

Goroutine은 다른 Goroutine의 상태에 직접적으로 접근하지 않고, 채널을 통해서만 데이터를 교환합니다. 이렇게 함으로써 상태 공유로 인한 데이터 경쟁(data race) 문제를 방지할 수 있습니다.

// Goroutine에서 직접적인 상태 공유 없이 채널로만 통신
go func() {
    ch <- 1
}()

go func() {
    ch <- 2
}()

x := <-ch
y := <-ch
  1. Buffered Channels (버퍼 채널)

기본 채널은 동기적으로 작동하지만, Go는 비동기적인 통신을 위해 버퍼된 채널도 제공합니다. 버퍼 채널을 사용하면, 송신자는 버퍼가 꽉 차기 전까지 블로킹되지 않으며, 수신자는 버퍼에 데이터가 남아 있을 때까지 데이터를 받을 수 있습니다.

ch := make(chan int, 2) // 버퍼 크기가 2인 채널 생성
ch <- 1
ch <- 2  // 버퍼에 여유 공간이 있으므로 블로킹되지 않음

fmt.Println(<-ch)  // 1 출력
fmt.Println(<-ch)  // 2 출력

Golang의 Channel

다시 돌아와서 채널에 대해서 알아보겠습니다.

1. 채널의 개념

채널은 Goroutine이 서로 데이터를 주고받는 파이프라인 역할을 합니다. 한 Goroutine이 데이터를 채널에 보내면 다른 Goroutine은 그 데이터를 받을 수 있습니다. 즉, 송신수신을 통해 Goroutine 간 통신이 이루어집니다.

ch := make(chan int) // 정수형 데이터를 주고받는 채널 생성

채널을 통해 데이터를 주고받을 때는 송신자수신자가 일치해야 합니다. 즉, 송신자가 데이터를 보낼 때 수신자가 그 데이터를 받을 준비가 되어 있어야 합니다. 그렇지 않으면 해당 Goroutine은 블로킹됩니다.

2. 채널의 기본 사용법

채널의 기본적인 송신과 수신의 사용법은 다음과 같습니다.


ch := make(chan int)  // int 타입의 채널 생성

// 송신: ch 채널로 42를 보냄
ch <- 42

// 수신: ch 채널에서 데이터를 수신
value := <-ch

위 예시에서:

  • ch <- 42: 채널로 값을 송신.
  • value := <-ch: 채널에서 값을 수신.

3. 채널의 종류

채널은 크게 Unbuffered 채널Buffered 채널로 나뉩니다.

(1) Unbuffered 채널

Unbuffered 채널은 송신과 수신이 동시에 일어나야 합니다. 즉, 송신자가 채널로 데이터를 보낼 때 수신자가 데이터를 받을 준비가 되어 있지 않으면 송신자는 블로킹됩니다. 반대로 수신자가 데이터를 받으려고 할 때 송신자가 데이터를 보내지 않았다면 수신자도 블로킹됩니다.

package main

import "fmt"

func main() {
    ch := make(chan int) // **Unbuffered** 채널 생성

    // Goroutine에서 채널로 값을 보냄
    go func() {
        ch <- 42 // 송신자가 수신자가 준비될 때까지 블로킹됨
    }()

    // 메인 Goroutine에서 채널로부터 값을 받음
    value := <-ch
    fmt.Println(value)  // 출력: 42
}

(2) Buffered 채널

Buffered 채널은 송신자가 일정량의 데이터를 수신자 없이도 채널에 보낼 수 있습니다. 버퍼가 꽉 차면 송신자는 다시 수신자가 데이터를 가져갈 때까지 블로킹됩니다.

ch := make(chan int, 2) // 버퍼 크기가 2인 채널 생성

ch <- 1  // 비어있기 때문에 데이터를 보냄 (블로킹되지 않음)
ch <- 2  // 버퍼에 두 번째 데이터 저장
ch <- 3  // 버퍼가 가득 차서 블로킹됨

버퍼 크기만큼 데이터를 버퍼에 저장할 수 있으며, 수신자는 버퍼에 있는 데이터를 꺼낼 수 있습니다. 그러나 버퍼가 가득 차거나 비어 있을 때는 송신자 또는 수신자가 블로킹됩니다.

4. 채널 닫기

채널은 닫을 수 있으며, 닫힌 채널에 데이터를 더 이상 보낼 수 없습니다. 하지만 닫힌 채널에서 데이터를 받는 것은 가능합니다. close() 함수를 사용하여 채널을 닫을 수 있습니다.

close(ch)

채널을 닫으면 다음과 같은 동작이 발생합니다:

  • 더 이상 채널에 데이터를 보낼 수 없습니다. 송신하려고 하면 panic이 발생합니다.
  • 채널에서 데이터를 계속 수신할 수 있습니다. 수신한 값이 더 이상 없으면, zero value가 반환됩니다.
package main

import "fmt"

func main() {
    ch := make(chan int, 3)

    ch <- 1
    ch <- 2
    ch <- 3

    close(ch) // 채널을 닫음

    // range를 통해 채널에서 데이터를 읽음
    for value := range ch {
        fmt.Println(value)  // 출력: 1, 2, 3
    }
}

중요: 닫힌 채널에서 값을 읽는 것은 가능하지만, 닫힌 채널에 값을 보내면 panic이 발생합니다. 따라서, 채널을 닫을 때는 주의가 필요합니다.

5. select 문을 통한 다중 채널 처리

select 문은 여러 개의 채널을 동시에 처리할 수 있는 방법입니다. select 문을 사용하면 여러 채널 중에서 하나 이상의 채널이 준비될 때 해당 채널을 처리합니다.

package main

import (
    "fmt"
    "time"
)

func main() {
    ch1 := make(chan string)
    ch2 := make(chan string)

    // 첫 번째 Goroutine
    go func() {
        time.Sleep(1 * time.Second)
        ch1 <- "Channel 1"
    }()

    // 두 번째 Goroutine
    go func() {
        time.Sleep(2 * time.Second)
        ch2 <- "Channel 2"
    }()

    // select를 사용하여 다중 채널 처리
    for i := 0; i < 2; i++ {
        select {
        case msg1 := <-ch1:
            fmt.Println("Received", msg1)
        case msg2 := <-ch2:
            fmt.Println("Received", msg2)
        }
    }
}

설명:

  • select 문은 여러 채널에서 하나 이상의 채널이 준비될 때까지 대기합니다.
  • 각 채널에서 데이터가 수신되면 그에 맞는 case가 실행됩니다.
  • select 문은 비동기적으로 여러 채널을 동시에 처리할 때 매우 유용합니다.

6. 채널과 동기화

채널은 동기화 도구로도 사용될 수 있습니다. Goroutine 간에 작업을 완료한 시점을 알리거나 특정 순서대로 작업을 처리할 때 채널을 활용할 수 있습니다.

go
Copy code
package main

import (
    "fmt"
    "time"
)

func worker(done chan bool) {
    fmt.Println("Worker started")
    time.Sleep(2 * time.Second)
    fmt.Println("Worker done")

    // 작업 완료 신호를 채널로 보냄
    done <- true
}

func main() {
    done := make(chan bool)

    // worker Goroutine 시작
    go worker(done)

    // worker가 작업을 마칠 때까지 대기
    <-done

    fmt.Println("Main finished")
}

설명:

  • worker 함수는 작업을 수행한 후 done 채널에 true 값을 전송하여 작업 완료를 알립니다.
  • 메인 Goroutine은 done 채널에서 값을 받을 때까지 대기합니다. 이를 통해 메인 Goroutine은 worker가 작업을 완료할 때까지 기다리게 됩니다.

7. Deadlock (데드락)

Go에서 채널을 사용할 때 주의해야 할 문제 중 하나는 데드락(deadlock)입니다. 데드락은 송신자가 채널에 데이터를 보내지만, 수신자가 해당 데이터를 받지 않을 때 발생합니다. 이때 송신자는 영원히 블로킹 상태에 빠지며 프로그램은 멈춥니다.

package main

func main() {
    ch := make(chan int)
    ch <- 1  // 여기에 도달하면 데드락 발생 (수신자가 없기 때문)
}

이 코드는 데드락을 유발합니다. ch 채널에 값을 보내지만, 값을 받을 수신자가 없기 때문에 프로그램이 멈춥니다.

8. 채널의 활용

채널은 Go 언어의 동시성을 구현하는 데 매우 유용합니다. 다양한 패턴에 사용할 수 있으며, Goroutine 간의 데이터를 주고받는 파이프라인 역할을 합니다. 이를 통해 동시성을 안전하게 관리하고, 복잡한 동기화 문제를 단순화할 수 있습니다.

  • 작업 분배(Worker Pool): Goroutine을 여러 개 실행하고, 각 Goroutine이 작업을 분배받아 처리할 때 채널을 사용합니다.
  • 비동기 작업 처리: 채널을 통해 여러 비동기 작업의 결과를 수집하고 처리할 수 있습니다.
  • 다중 채널 처리: select 문을 사용하여 여러 채널을 동시에 관리할 수 있습니다.

피보나치 수열 계산하기

package main

import (
    "fmt"
    "sync"
)

// Fibonacci 계산 함수
func fibonacci(n int, ch chan<- int, wg *sync.WaitGroup) {
    defer wg.Done() // 작업 완료 시 WaitGroup에서 하나 감소
    a, b := 0, 1
    for i := 0; i < n; i++ {
        a, b = b, a+b
    }
    ch <- a // 계산 결과를 채널로 전송
}

func main() {
    const numCalculations = 10

    // 채널과 WaitGroup을 생성
    ch := make(chan int, numCalculations)
    var wg sync.WaitGroup

    // 여러 Goroutine을 실행하여 Fibonacci 계산
    for i := 0; i < numCalculations; i++ {
        wg.Add(1) // WaitGroup에 작업 추가
        go fibonacci(i, ch, &wg)
    }

    // Goroutine에서 계산이 모두 완료될 때까지 대기
    go func() {
        wg.Wait()
        close(ch) // 모든 작업이 완료되면 채널을 닫음
    }()

    // 결과 수집 및 출력
    for result := range ch {
        fmt.Println(result)
    }
}

설명:

  • Fibonacci Goroutine: fibonacci 함수는 특정 숫자의 Fibonacci 수를 계산하고 그 결과를 채널로 전달합니다.
  • WaitGroup: sync.WaitGroup을 사용하여 모든 Goroutine이 작업을 마칠 때까지 기다립니다. 이를 통해 동시성 작업이 완료된 후 결과를 처리할 수 있습니다.
  • Main 함수: 메인 함수에서는 10개의 Fibonacci 수를 동시에 계산하며, Goroutine이 계산을 끝낼 때마다 채널로부터 결과를 수신합니다.

Worker Pool을 이용한 작업 분배

ppackage main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

// Job 구조체
type Job struct {
    ID       int
    Duration time.Duration
}

// Worker 구조체
type Worker struct {
    ID          int
    JobChannel  chan Job
    QuitChannel chan bool
}

// NewWorker는 새로운 Worker를 생성합니다.
func NewWorker(id int) Worker {
    return Worker{
        ID:          id,
        JobChannel:  make(chan Job),
        QuitChannel: make(chan bool),
    }
}

// Start는 Worker를 시작하여 Job을 처리합니다.
func (w Worker) Start(wg *sync.WaitGroup) {
    defer wg.Done()
    for {
        select {
        case job := <-w.JobChannel:
            fmt.Printf("Worker %d started job %d\n", w.ID, job.ID)
            time.Sleep(job.Duration) // 작업 처리 시간 시뮬레이션
            fmt.Printf("Worker %d completed job %d\n", w.ID, job.ID)
        case <-w.QuitChannel:
            fmt.Printf("Worker %d stopping\n", w.ID)
            return
        }
    }
}

// Queue는 Job을 관리하는 구조체입니다.
type Queue struct {
    jobs      chan Job
    workers   []Worker
    workerWg  sync.WaitGroup
}

// NewQueue는 새로운 Queue를 생성합니다.
func NewQueue(numWorkers int) *Queue {
    queue := &Queue{
        jobs:    make(chan Job),
        workers: make([]Worker, numWorkers),
    }

    // Worker를 초기화하고 시작합니다.
    for i := 0; i < numWorkers; i++ {
        worker := NewWorker(i)
        queue.workers[i] = worker
        queue.workerWg.Add(1)
        go worker.Start(&queue.workerWg)
    }

    return queue
}

// AddJob은 Queue에 새로운 Job을 추가합니다.
func (q *Queue) AddJob(job Job) {
    q.jobs <- job
}

// Start는 Queue를 시작하여 Job을 워커에 분배합니다.
func (q *Queue) Start() {
    go func() {
        for job := range q.jobs {
            // 가용한 Worker에게 Job을 분배합니다.
            for _, worker := range q.workers {
                select {
                case worker.JobChannel <- job:
                    break // Job을 Worker에 보낸 후 루프 종료
                default:
                    // Worker가 바쁘면 다음 Worker를 시도
                    continue
                }
            }
        }
    }()
}

// Stop은 모든 Worker를 종료합니다.
func (q *Queue) Stop() {
    for _, worker := range q.workers {
        worker.QuitChannel <- true // Worker 종료 신호 전송
    }
    q.workerWg.Wait() // 모든 Worker가 종료될 때까지 대기
}

func main() {
    rand.Seed(time.Now().UnixNano())
    queue := NewQueue(3) // 3개의 Worker를 가진 Queue 생성
    queue.Start()         // Queue 시작

    // 10개의 Job을 Queue에 추가
    for i := 0; i < 10; i++ {
        duration := time.Duration(rand.Intn(3)+1) * time.Second
        job := Job{
            ID:       i,
            Duration: duration,
        }
        queue.AddJob(job)
    }

    // 모든 작업을 처리한 후 Queue 종료
    time.Sleep(15 * time.Second) // 잠시 대기하여 모든 작업이 완료되도록 함
    queue.Stop()                 // 모든 Worker 종료
    fmt.Println("All workers stopped.")
}

설명

  1. Job 구조체: 각 작업을 나타내며, ID와 작업을 수행하는 데 걸리는 시간을 포함합니다.
  2. Worker 구조체: 각 Worker를 나타내며, 작업을 수신할 수 있는 채널(JobChannel)과 종료 신호를 받을 채널(QuitChannel)을 포함합니다.
  3. NewWorker: 새로운 Worker를 생성하는 함수입니다.
  4. Start: Worker가 작업을 처리하는 메서드입니다. Worker는 JobChannel에서 Job을 수신하고, 작업이 완료되면 결과를 출력합니다.
  5. Queue 구조체: 작업을 관리하는 구조체입니다. jobs 채널을 통해 작업을 수신하고, 여러 Worker에 작업을 분배합니다.
  6. NewQueue: 새로운 Queue를 생성하고 Worker를 초기화하여 시작합니다.
  7. AddJob: 새로운 Job을 Queue에 추가하는 메서드입니다.
  8. Start: Queue를 시작하여 Job을 Worker에게 분배하는 로직입니다. 가용한 Worker가 있을 때 Job을 할당합니다.
  9. Stop: 모든 Worker를 종료하는 메서드입니다.
  10. main 함수:
  • 3개의 Worker를 가진 Queue를 생성합니다.
  • 10개의 Job을 Queue에 추가합니다.
  • 모든 작업이 완료될 때까지 대기 후 Worker를 종료합니다.

동작 흐름

  • Job 생성: 랜덤한 시간을 가진 10개의 Job이 생성되어 Queue에 추가됩니다.
  • Worker: 각 Worker는 Queue에서 Job을 수신하고, 작업을 처리합니다.
  • Job 분배: Queue는 모든 Worker에게 순차적으로 Job을 분배합니다. 만약 Worker가 바쁘면 다음 Worker에게 Job을 시도합니다.
  • Worker 종료: 모든 작업이 완료되면 Worker를 종료합니다.

이러한 방식으로 구현된 Worker Pool은 Queue를 사용하여 각 작업을 분배하는 구조로, 각 Worker가 동일한 Job을 받지 않고, Queue에서 하나씩 가져와 작업을 처리할 수 있게 됩니다. 이를 통해 보다 효율적이고 유연한 작업 처리가 가능해집니다.

Gin 프레임워크를 사용하여 백그라운드에서 실행되고 있는 Goroutine채널을 통해 통신하는 예제 코드입니다.

package main

import (
    "fmt"
    "net/http"
    "sync"
    "time"

    "github.com/gin-gonic/gin"
)

// 작업 상태를 저장할 구조체
type TaskStatus struct {
    mu       sync.Mutex
    status   string
    progress int
}

var taskStatus = TaskStatus{
    status:   "Not Started", // 초기 작업 상태
    progress: 0,             // 진행 상황 (0 ~ 100)
}

// 백그라운드에서 실행될 작업 (Goroutine)
func backgroundTask() {
    for i := 0; i <= 100; i += 10 {
        time.Sleep(1 * time.Second) // 작업 시뮬레이션 (1초 대기)

        // 진행 상황 업데이트
        taskStatus.mu.Lock()
        taskStatus.progress = i
        taskStatus.status = fmt.Sprintf("In Progress: %d%%", i)
        taskStatus.mu.Unlock()
    }

    // 작업 완료 후 상태 업데이트
    taskStatus.mu.Lock()
    taskStatus.status = "Completed"
    taskStatus.progress = 100
    taskStatus.mu.Unlock()

}

func main() {
    r := gin.Default()

    // POST 요청으로 작업을 시작하는 핸들러
    r.POST("/start-task", func(c *gin.Context) {
        // 새로운 채널을 생성하여 Goroutine과 통신
        resultChan := make(chan string)

        // 이미 진행 중인 작업이 있는지 확인
        taskStatus.mu.Lock()
        if taskStatus.status == "In Progress" {
            taskStatus.mu.Unlock()
            c.JSON(http.StatusBadRequest, gin.H{"message": "A task is already in progress"})
            return
        }

        // 작업 상태 초기화
        taskStatus.status = "In Progress"
        taskStatus.progress = 0
        taskStatus.mu.Unlock()

        // 백그라운드에서 작업을 실행하는 Goroutine 시작
        go backgroundTask()

        // 즉각적인 응답 반환
        c.JSON(http.StatusOK, gin.H{
            "message": "Task started",
        })
    })

    // GET 요청으로 작업 상태를 확인하는 핸들러
    r.GET("/task-status", func(c *gin.Context) {
        // 현재 작업 상태를 반환
        taskStatus.mu.Lock()
        status := taskStatus.status
        progress := taskStatus.progress
        taskStatus.mu.Unlock()

        c.JSON(http.StatusOK, gin.H{
            "status":   status,
            "progress": progress,
        })
    })

    // 서버 실행
    r.Run(":8080") // 8080 포트에서 실행
}

결론

Go 언어의 채널은 Goroutine 간의 통신을 안전하게 처리하고, 동시성 문제를 간결하게 해결할 수 있는 강력한 도구입니다. select 문과 함께 사용하면 다양한 동시성 패턴을 쉽게 구현할 수 있으며, 동시성 프로그래밍의 복잡성을 줄여줍니다.