Golang

Golang 分散トランザクション

Golang

TCCパターンとSagaパターンの違い

TCCパターン(Try-Confirm/Cancel)

  1. Tryフェーズ: 各サービスがリソースを予約します。予約が成功すると、リソースは一時的に確保されます。
  2. Confirmフェーズ: すべてのサービスのTryフェーズが成功した場合に実行され、リソースの予約が確定されます。
  3. Cancelフェーズ: どれか一つのサービスのTryフェーズが失敗した場合や、Confirmフェーズが失敗した場合に実行され、リソースの予約がキャンセルされます。

Sagaパターン

  1. ステップのシーケンス: 各サービスが順次操作を行い、操作が成功したら次のステップに進みます。操作が失敗した場合には、すべての成功したステップに対して補償トランザクションを実行します。
  2. 補償トランザクション: 各ステップに対して補償トランザクションが定義されており、失敗時にその補償トランザクションを実行します。

TCCパターン

 

  • TCC
    • Try
    • Confirm
    • Cancel
      • TCC Controllerは成功するまでTryし続けること
  • ServiceA, ServiceBはスケジュールジョブで更新された値の最終的なconfirm処理を自分で行う

 

データベースセットアップ

CREATE DATABASE tcc_example;

USE tcc_example;

CREATE TABLE products (
    id INT AUTO_INCREMENT PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    stock INT NOT NULL
);

CREATE TABLE transactions (
    id INT AUTO_INCREMENT PRIMARY KEY,
    service VARCHAR(255) NOT NULL,
    product_id INT NOT NULL,
    quantity INT NOT NULL,
    status ENUM('try', 'confirm_pending', 'confirm', 'cancel') NOT NULL,
    worker_event_id INT NOT NULL,
    FOREIGN KEY (product_id) REFERENCES products(id),
    FOREIGN KEY (worker_event_id) REFERENCES worker_events(id)
);

CREATE TABLE worker_events (
    id INT AUTO_INCREMENT PRIMARY KEY,
    status ENUM('pending', 'completed', 'failed') NOT NULL
);

-- 初期データ
INSERT INTO products (name, stock) VALUES ('ProductA', 100), ('ProductB', 200);

ServiceA

package main

import (
    "database/sql"
    "fmt"
    "net/http"
    "strconv"
    "time"

    _ "github.com/go-sql-driver/mysql"
)

var db *sql.DB

func main() {
    var err error
    db, err = sql.Open("mysql", "user:password@tcp(127.0.0.1:3306)/tcc_example")
    if err != nil {
        panic(err)
    }
    defer db.Close()

    http.HandleFunc("/try", TryHandler)
    http.HandleFunc("/confirm", ConfirmHandler)
    http.HandleFunc("/finalize", FinalizeHandler)
    http.HandleFunc("/cancel", CancelHandler)
    http.HandleFunc("/checkStatus", CheckStatusHandler)

    go scheduleStatusCheck()

    fmt.Println("Service A is running on port 8080")
    http.ListenAndServe(":8080", nil)
}

func scheduleStatusCheck() {
    for {
        checkStatusAndUpdate()
        time.Sleep(5 * time.Minute) // 5分ごとに実行
    }
}

func checkStatusAndUpdate() {
    rows, err := db.Query("SELECT product_id, worker_event_id FROM transactions WHERE service='ServiceA' AND status IN ('try', 'confirm_pending')")
    if err != nil {
        fmt.Println("Error fetching pending transactions:", err)
        return
    }
    defer rows.Close()

    var productID, workerEventID int
    for rows.Next() {
        err := rows.Scan(&productID, &workerEventID)
        if err != nil {
            fmt.Println("Error scanning row:", err)
            continue
        }
        checkAndFinalizeStatus(productID, workerEventID)
    }
}

func checkAndFinalizeStatus(productID, workerEventID int) {
    resp, err := http.Get(fmt.Sprintf("http://localhost:8080/checkStatus?product_id=%d&worker_event_id=%d", productID, workerEventID))
    if err != nil {
        fmt.Println("Error checking status for product", productID, ":", err)
        return
    }
    defer resp.Body.Close()

    if resp.StatusCode == http.StatusOK {
        // 状態が "confirm" になっているかを確認して更新
        var status string
        _, err := fmt.Fscanf(resp.Body, "Status in Service A: %s", &status)
        if err != nil {
            fmt.Println("Error reading response body:", err)
            return
        }

        if status == "confirm" {
            // ここで最終的に処理を確定するロジックを追加
            fmt.Println("Transaction for product", productID, "confirmed successfully in Service A")
        } else if status == "cancel" {
            // キャンセル処理を追加
            fmt.Println("Transaction for product", productID, "cancelled in Service A")
        } else {
            fmt.Println("Transaction for product", productID, "is still pending in Service A")
        }
    }
}

func TryHandler(w http.ResponseWriter, r *http.Request) {
    productID, _ := strconv.Atoi(r.URL.Query().Get("product_id"))
    quantity, _ := strconv.Atoi(r.URL.Query().Get("quantity"))
    workerEventID, _ := strconv.Atoi(r.URL.Query().Get("worker_event_id"))

    // 在庫の予約処理
    res, err := db.Exec("UPDATE products SET stock = stock - ? WHERE id = ? AND stock >= ?", quantity, productID, quantity)
    if err != nil {
        http.Error(w, "Try failed in Service A", http.StatusInternalServerError)
        return
    }

    rowsAffected, _ := res.RowsAffected()
    if rowsAffected == 0 {
        http.Error(w, "Not enough stock in Service A", http.StatusBadRequest)
        return
    }

    _, err = db.Exec("INSERT INTO transactions (service, product_id, quantity, status, worker_event_id) VALUES ('ServiceA', ?, ?, 'try', ?)", productID, quantity, workerEventID)
    if err != nil {
        http.Error(w, "Try transaction record failed in Service A", http.StatusInternalServerError)
        return
    }

    fmt.Fprintf(w, "Try in Service A successful")
}

func ConfirmHandler(w http.ResponseWriter, r *http.Request) {
    productID, _ := strconv.Atoi(r.URL.Query().Get("product_id"))
    workerEventID, _ := strconv.Atoi(r.URL.Query().Get("worker_event_id"))

    _, err := db.Exec("UPDATE transactions SET status='confirm_pending' WHERE service='ServiceA' AND product_id=? AND worker_event_id=? AND status='try'", productID, workerEventID)
    if err != nil {
        http.Error(w, "Confirm failed in Service A", http.StatusInternalServerError)
        return
    }

    fmt.Fprintf(w, "Confirm in Service A successful")
}

func FinalizeHandler(w http.ResponseWriter, r *http.Request) {
    productID, _ := strconv.Atoi(r.URL.Query().Get("product_id"))
    workerEventID, _ := strconv.Atoi(r.URL.Query().Get("worker_event_id"))

    _, err := db.Exec("UPDATE transactions SET status='confirm' WHERE service='ServiceA' AND product_id=? AND worker_event_id=? AND status='confirm_pending'", productID, workerEventID)
    if err != nil {
        http.Error(w, "Finalize failed in Service A", http.StatusInternalServerError)
        return
    }

    fmt.Fprintf(w, "Finalize in Service A successful")
}

func CancelHandler(w http.ResponseWriter, r *http.Request) {
    productID, _ := strconv.Atoi(r.URL.Query().Get("product_id"))
    quantity, _ := strconv.Atoi(r.URL.Query().Get("quantity"))
    workerEventID, _ := strconv.Atoi(r.URL.Query().Get("worker_event_id"))

    _, err := db.Exec("UPDATE products SET stock = stock + ? WHERE id = ?", quantity, productID)
    if err != nil {
        http.Error(w, "Cancel failed in Service A", http.StatusInternalServerError)
        return
    }

    _, err = db.Exec("UPDATE transactions SET status='cancel' WHERE service='ServiceA' AND product_id=? AND worker_event_id=? AND status IN ('try', 'confirm_pending')", productID, workerEventID)
    if err != nil {
        http.Error(w, "Cancel transaction record failed in Service A", http.StatusInternalServerError)
        return
    }

    fmt.Fprintf(w, "Cancel in Service A successful")
}

func CheckStatusHandler(w http.ResponseWriter, r *http.Request) {
    productID, _ := strconv.Atoi(r.URL.Query().Get("product_id"))
    workerEventID, _ := strconv.Atoi(r.URL.Query().Get("worker_event_id"))
    var status string

    err := db.QueryRow("SELECT status FROM transactions WHERE service='ServiceA' AND product_id=? AND worker_event_id=? ORDER BY id DESC LIMIT 1", productID, workerEventID).Scan(&status)
    if err != nil {
        http.Error(w, "Failed to check status in Service A", http.StatusInternalServerError)
        return
    }

    fmt.Fprintf(w, "Status in Service A: %s", status)
}

 

ServiceB

package main

import (
    "database/sql"
    "fmt"
    "net/http"
    "strconv"
    "time"

    _ "github.com/go-sql-driver/mysql"
)

var db *sql.DB

func main() {
    var err error
    db, err = sql.Open("mysql", "user:password@tcp(127.0.0.1:3306)/tcc_example")
    if err != nil {
        panic(err)
    }
    defer db.Close()

    http.HandleFunc("/try", TryHandler)
    http.HandleFunc("/confirm", ConfirmHandler)
    http.HandleFunc("/finalize", FinalizeHandler)
    http.HandleFunc("/cancel", CancelHandler)
    http.HandleFunc("/checkStatus", CheckStatusHandler)

    go scheduleStatusCheck()

    fmt.Println("Service B is running on port 8081")
    http.ListenAndServe(":8081", nil)
}

func scheduleStatusCheck() {
    for {
        checkStatusAndUpdate()
        time.Sleep(5 * time.Minute) // 5分ごとに実行
    }
}

func checkStatusAndUpdate() {
    rows, err := db.Query("SELECT product_id, worker_event_id FROM transactions WHERE service='ServiceB' AND status IN ('try', 'confirm_pending')")
    if err != nil {
        fmt.Println("Error fetching pending transactions:", err)
        return
    }
    defer rows.Close()

    var productID, workerEventID int
    for rows.Next() {
        err := rows.Scan(&productID, &workerEventID)
        if err != nil {
            fmt.Println("Error scanning row:", err)
            continue
        }
        checkAndFinalizeStatus(productID, workerEventID)
    }
}

func checkAndFinalizeStatus(productID, workerEventID int) {
    resp, err := http.Get(fmt.Sprintf("http://localhost:8081/checkStatus?product_id=%d&worker_event_id=%d", productID, workerEventID))
    if err != nil {
        fmt.Println("Error checking status for product", productID, ":", err)
        return
    }
    defer resp.Body.Close()

    if resp.StatusCode == http.StatusOK {
        // 状態が "confirm" になっているかを確認して更新
        var status string
        _, err := fmt.Fscanf(resp.Body, "Status in Service B: %s", &status)
        if err != nil {
            fmt.Println("Error reading response body:", err)
            return
        }

        if status == "confirm" {
            // ここで最終的に処理を確定するロジックを追加
            fmt.Println("Transaction for product", productID, "confirmed successfully in Service B")
        } else if status == "cancel" {
            // キャンセル処理を追加
            fmt.Println("Transaction for product", productID, "cancelled in Service B")
        } else {
            fmt.Println("Transaction for product", productID, "is still pending in Service B")
        }
    }
}

func TryHandler(w http.ResponseWriter, r *http.Request) {
    productID, _ := strconv.Atoi(r.URL.Query().Get("product_id"))
    quantity, _ := strconv.Atoi(r.URL.Query().Get("quantity"))
    workerEventID, _ := strconv.Atoi(r.URL.Query().Get("worker_event_id"))

    // 在庫の予約処理
    res, err := db.Exec("UPDATE products SET stock = stock - ? WHERE id = ? AND stock >= ?", quantity, productID, quantity)
    if err != nil {
        http.Error(w, "Try failed in Service B", http.StatusInternalServerError)
        return
    }

    rowsAffected, _ := res.RowsAffected()
    if rowsAffected == 0 {
        http.Error(w, "Not enough stock in Service B", http.StatusBadRequest)
        return
    }

    _, err = db.Exec("INSERT INTO transactions (service, product_id, quantity, status, worker_event_id) VALUES ('ServiceB', ?, ?, 'try', ?)", productID, quantity, workerEventID)
    if err != nil {
        http.Error(w, "Try transaction record failed in Service B", http.StatusInternalServerError)
        return
    }

    fmt.Fprintf(w, "Try in Service B successful")
}

func ConfirmHandler(w http.ResponseWriter, r *http.Request) {
    productID, _ := strconv.Atoi(r.URL.Query().Get("product_id"))
    workerEventID, _ := strconv.Atoi(r.URL.Query().Get("worker_event_id"))

    _, err := db.Exec("UPDATE transactions SET status='confirm_pending' WHERE service='ServiceB' AND product_id=? AND worker_event_id=? AND status='try'", productID, workerEventID)
    if err != nil {
        http.Error(w, "Confirm failed in Service B", http.StatusInternalServerError)
        return
    }

    fmt.Fprintf(w, "Confirm in Service B successful")
}

func FinalizeHandler(w http.ResponseWriter, r *http.Request) {
    productID, _ := strconv.Atoi(r.URL.Query().Get("product_id"))
    workerEventID, _ := strconv.Atoi(r.URL.Query().Get("worker_event_id"))

    _, err := db.Exec("UPDATE transactions SET status='confirm' WHERE service='ServiceB' AND product_id=? AND worker_event_id=? AND status='confirm_pending'", productID, workerEventID)
    if err != nil {
        http.Error(w, "Finalize failed in Service B", http.StatusInternalServerError)
        return
    }

    fmt.Fprintf(w, "Finalize in Service B successful")
}

func CancelHandler(w http.ResponseWriter, r *http.Request) {
    productID, _ := strconv.Atoi(r.URL.Query().Get("product_id"))
    quantity, _ := strconv.Atoi(r.URL.Query().Get("quantity"))
    workerEventID, _ := strconv.Atoi(r.URL.Query().Get("worker_event_id"))

    _, err := db.Exec("UPDATE products SET stock = stock + ? WHERE id = ?", quantity, productID)
    if err != nil {
        http.Error(w, "Cancel failed in Service B", http.StatusInternalServerError)
        return
    }

    _, err = db.Exec("UPDATE transactions SET status='cancel' WHERE service='ServiceB' AND product_id=? AND worker_event_id=? AND status IN ('try', 'confirm_pending')", productID, workerEventID)
    if err != nil {
        http.Error(w, "Cancel transaction record failed in Service B", http.StatusInternalServerError)
        return
    }

    fmt.Fprintf(w, "Cancel in Service B successful")
}

func CheckStatusHandler(w http.ResponseWriter, r *http.Request) {
    productID, _ := strconv.Atoi(r.URL.Query().Get("product_id"))
    workerEventID, _ := strconv.Atoi(r.URL.Query().Get("worker_event_id"))
    var status string

    err := db.QueryRow("SELECT status FROM transactions WHERE service='ServiceB' AND product_id=? AND worker_event_id=? ORDER BY id DESC LIMIT 1", productID, workerEventID).Scan(&status)
    if err != nil {
        http.Error(w, "Failed to check status in Service B", http.StatusInternalServerError)
        return
    }

    fmt.Fprintf(w, "Status in Service B: %s", status)
}
}

 

TCC Controller

package main

import (
    "database/sql"
    "fmt"
    "net/http"
    "io/ioutil"
    "strconv"

    _ "github.com/go-sql-driver/mysql"
)

var db *sql.DB

func main() {
    var err error
    db, err = sql.Open("mysql", "user:password@tcp(127.0.0.1:3306)/tcc_example")
    if err != nil {
        panic(err)
    }
    defer db.Close()

    workerEventID, err := createWorkerEvent()
    if err != nil {
        fmt.Println("Failed to create worker event:", err)
        return
    }

    err = ProcessTCC(workerEventID, 1, 10)
    if err != nil {
        fmt.Println("Transaction failed:", err)
    } else {
        fmt.Println("Transaction succeeded")
    }
}

func createWorkerEvent() (int, error) {
    res, err := db.Exec("INSERT INTO worker_events (status) VALUES ('pending')")
    if err != nil {
        return 0, err
    }

    workerEventID, err := res.LastInsertId()
    if err != nil {
        return 0, err
    }

    return int(workerEventID), nil
}

func updateWorkerEventStatus(workerEventID int, status string) error {
    _, err := db.Exec("UPDATE worker_events SET status=? WHERE id=?", status, workerEventID)
    return err
}

func ProcessTCC(workerEventID, productID, quantity int) error {
    tx, err := db.Begin()
    if err != nil {
        return err
    }

    // Try phase
    if err := try(tx, workerEventID, productID, quantity); err != nil {
        tx.Rollback()
        if err := retryCancel(workerEventID, productID, quantity, 3); err != nil {
            fmt.Println("Final Cancel failed after retries:", err)
        }
        updateWorkerEventStatus(workerEventID, "failed")
        return err
    }

    // Confirm phase
    if err := confirm(tx, workerEventID, productID); err != nil {
        tx.Rollback()
        if err := retryCancel(workerEventID, productID, quantity, 3); err != nil {
            fmt.Println("Final Cancel failed after retries:", err)
        }
        updateWorkerEventStatus(workerEventID, "failed")
        return err
    }

    // 最終確認フェーズ
    if err := finalizeConfirm(tx, workerEventID, productID); err != nil {
        tx.Rollback()
        if err := retryCancelPending(workerEventID, productID, quantity, 3); err != nil {
            fmt.Println("Final Cancel failed after retries:", err)
        }
        updateWorkerEventStatus(workerEventID, "failed")
        return err
    }

    updateWorkerEventStatus(workerEventID, "completed")
    return tx.Commit()
}

func try(tx *sql.Tx, workerEventID, productID, quantity int) error {
    if err := callService("http://localhost:8080/try", productID, quantity, workerEventID); err != nil {
        return err
    }
    if err := callService("http://localhost:8081/try", productID, quantity, workerEventID); err != nil {
        return err
    }

    _, err := tx.Exec("INSERT INTO transactions (service, product_id, quantity, status, worker_event_id) VALUES ('TCCController', ?, ?, 'try', ?)", productID, quantity, workerEventID)
    return err
}

func confirm(tx *sql.Tx, workerEventID, productID int) error {
    if err := callService("http://localhost:8080/confirm", productID, 0, workerEventID); err != nil {
        return err
    }
    if err := callService("http://localhost:8081/confirm", productID, 0, workerEventID); err != nil {
        return err
    }

    _, err := tx.Exec("UPDATE transactions SET status='confirm_pending' WHERE service='TCCController' AND product_id=? AND worker_event_id=? AND status='try'", productID, workerEventID)
    return err
}

func finalizeConfirm(tx *sql.Tx, workerEventID, productID int) error {
    if err := callService("http://localhost:8080/finalize", productID, 0, workerEventID); err != nil {
        return err
    }
    if err := callService("http://localhost:8081/finalize", productID, 0, workerEventID); err != nil {
        return err
    }

    _, err := tx.Exec("UPDATE transactions SET status='confirm' WHERE service='TCCController' AND product_id=? AND worker_event_id=? AND status='confirm_pending'", productID, workerEventID)
    return err
}

func retryCancel(workerEventID, productID, quantity int, retries int) error {
    var err error
    for i := 0; i < retries; i++ {
        err = cancel(workerEventID, productID, quantity)
        if err == nil {
            return nil
        }
        fmt.Printf("Cancel attempt %d failed: %v\n", i+1, err)
    }
    return err
}

func retryCancelPending(workerEventID, productID, quantity int, retries int) error {
    var err error
    for i := 0; i < retries; i++ {
        err = cancelPending(workerEventID, productID, quantity)
        if err == nil {
            return nil
        }
        fmt.Printf("Cancel pending attempt %d failed: %v\n", i+1, err)
    }
    return err
}

func cancel(workerEventID, productID, quantity int) error {
    if err := callService("http://localhost:8080/cancel", productID, quantity, workerEventID); err != nil {
        return err
    }
    if err := callService("http://localhost:8081/cancel", productID, quantity, workerEventID); err != nil {
        return err
    }

    _, err := db.Exec("UPDATE transactions SET status='cancel' WHERE service='TCCController' AND product_id=? AND worker_event_id=? AND status='try'", productID, workerEventID)
    return err
}

func cancelPending(workerEventID, productID, quantity int) error {
    if err := callService("http://localhost:8080/cancel", productID, quantity, workerEventID); err != nil {
        return err
    }
    if err := callService("http://localhost:8081/cancel", productID, quantity, workerEventID); err != nil {
        return err
    }

    _, err := db.Exec("UPDATE transactions SET status='cancel' WHERE service='TCCController' AND product_id=? AND worker_event_id=? AND status='confirm_pending'", productID, workerEventID)
    return err
}

func callService(url string, productID, quantity, workerEventID int) error {
    fullURL := fmt.Sprintf("%s?product_id=%d&quantity=%d&worker_event_id=%d", url, productID, quantity, workerEventID)
    resp, err := http.Get(fullURL)
    if err != nil {
        return err
    }
    defer resp.Body.Close()

    body, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        return err
    }

    fmt.Println(string(body))
    return nil
}

 

 

 

Sagaパターン

  • worker_eventsテーブルにerror_messageカラムを追加し、各ステップでエラーが発生した場合にエラーメッセージを保存します。
  • updateWorkerEventStatus関数は、ステータスとエラーメッセージの両方を更新します。
  • 通常のトランザクションが成功した場合はステータスを「completed」に更新し、補償トランザクションが成功した場合はステータスを「compensated」に更新します。
  • 各ステップでエラーが発生した際に、どのステップでエラーが発生したかをエラーメッセージとして保存します。
  • スケジュールジョブ(scheduleCompensation)は、定期的に失敗したトランザクションを確認し、補償トランザクションを実行します。補償トランザクションがすべて成功すると、ステータスを「compensated」に更新します。

 

データベースセットアップ

CREATE DATABASE sage_example;

USE sage_example;

CREATE TABLE products (
    id INT AUTO_INCREMENT PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    stock INT NOT NULL
);

CREATE TABLE worker_events (
    id INT AUTO_INCREMENT PRIMARY KEY,
    status ENUM('pending', 'completed', 'failed') NOT NULL,
    error_message TEXT
);

CREATE TABLE transactions (
    id INT AUTO_INCREMENT PRIMARY KEY,
    service VARCHAR(255) NOT NULL,
    product_id INT NOT NULL,
    quantity INT NOT NULL,
    status ENUM('try', 'confirm', 'cancel', 'compensated') NOT NULL,
    worker_event_id INT NOT NULL,
    FOREIGN KEY (product_id) REFERENCES products(id),
    FOREIGN KEY (worker_event_id) REFERENCES worker_events(id)
);

-- 初期データ
INSERT INTO products (name, stock) VALUES ('ProductA', 100), ('ProductB', 200);

ServiceA

package main

import (
    "database/sql"
    "fmt"
    "net/http"
    "strconv"

    _ "github.com/go-sql-driver/mysql"
)

var db *sql.DB

func main() {
    var err error
    db, err = sql.Open("mysql", "user:password@tcp(127.0.0.1:3306)/sage_example")
    if err != nil {
        panic(err)
    }
    defer db.Close()

    http.HandleFunc("/reserve", ReserveHandler)
    http.HandleFunc("/confirm", ConfirmHandler)
    http.HandleFunc("/cancel", CancelHandler)

    fmt.Println("Service A is running on port 8080")
    http.ListenAndServe(":8080", nil)
}

func ReserveHandler(w http.ResponseWriter, r *http.Request) {
    productID, _ := strconv.Atoi(r.URL.Query().Get("product_id"))
    quantity, _ := strconv.Atoi(r.URL.Query().Get("quantity"))
    workerEventID, _ := strconv.Atoi(r.URL.Query().Get("worker_event_id"))

    res, err := db.Exec("UPDATE products SET stock = stock - ? WHERE id = ? AND stock >= ?", quantity, productID, quantity)
    if err != nil {
        http.Error(w, "Reserve failed in Service A", http.StatusInternalServerError)
        return
    }

    rowsAffected, _ := res.RowsAffected()
    if rowsAffected == 0 {
        http.Error(w, "Not enough stock in Service A", http.StatusBadRequest)
        return
    }

    _, err = db.Exec("INSERT INTO transactions (service, product_id, quantity, status, worker_event_id) VALUES ('ServiceA', ?, ?, 'try', ?)", productID, quantity, workerEventID)
    if err != nil {
        http.Error(w, "Reserve transaction record failed in Service A", http.StatusInternalServerError)
        return
    }

    fmt.Fprintf(w, "Reserve in Service A successful")
}

func ConfirmHandler(w http.ResponseWriter, r *http.Request) {
    productID, _ := strconv.Atoi(r.URL.Query().Get("product_id"))
    workerEventID, _ := strconv.Atoi(r.URL.Query().Get("worker_event_id"))

    _, err := db.Exec("UPDATE transactions SET status='confirm' WHERE service='ServiceA' AND product_id=? AND worker_event_id=? AND status='try'", productID, workerEventID)
    if err != nil {
        http.Error(w, "Confirm failed in Service A", http.StatusInternalServerError)
        return
    }

    fmt.Fprintf(w, "Confirm in Service A successful")
}

func CancelHandler(w http.ResponseWriter, r *http.Request) {
    productID, _ := strconv.Atoi(r.URL.Query().Get("product_id"))
    quantity, _ := strconv.Atoi(r.URL.Query().Get("quantity"))
    workerEventID, _ := strconv.Atoi(r.URL.Query().Get("worker_event_id"))

    _, err := db.Exec("UPDATE products SET stock = stock + ? WHERE id = ?", quantity, productID)
    if err != nil {
        http.Error(w, "Cancel failed in Service A", http.StatusInternalServerError)
        return
    }

    _, err = db.Exec("UPDATE transactions SET status='compensated' WHERE service='ServiceA' AND product_id=? AND worker_event_id=? AND status IN ('try', 'confirm')", productID, workerEventID)
    if err != nil {
        http.Error(w, "Cancel transaction record failed in Service A", http.StatusInternalServerError)
        return
    }

    fmt.Fprintf(w, "Cancel in Service A successful")
}

ServiceB

package main

import (
    "database/sql"
    "fmt"
    "net/http"
    "strconv"

    _ "github.com/go-sql-driver/mysql"
)

var db *sql.DB

func main() {
    var err error
    db, err = sql.Open("mysql", "user:password@tcp(127.0.0.1:3306)/sage_example")
    if err != nil {
        panic(err)
    }
    defer db.Close()

    http.HandleFunc("/reserve", ReserveHandler)
    http.HandleFunc("/confirm", ConfirmHandler)
    http.HandleFunc("/cancel", CancelHandler)

    fmt.Println("Service B is running on port 8081")
    http.ListenAndServe(":8081", nil)
}

func ReserveHandler(w http.ResponseWriter, r *http.Request) {
    productID, _ := strconv.Atoi(r.URL.Query().Get("product_id"))
    quantity, _ := strconv.Atoi(r.URL.Query().Get("quantity"))
    workerEventID, _ := strconv.Atoi(r.URL.Query().Get("worker_event_id"))

    res, err := db.Exec("UPDATE products SET stock = stock - ? WHERE id = ? AND stock >= ?", quantity, productID, quantity)
    if err != nil {
        http.Error(w, "Reserve failed in Service B", http.StatusInternalServerError)
        return
    }

    rowsAffected, _ := res.RowsAffected()
    if rowsAffected == 0 {
        http.Error(w, "Not enough stock in Service B", http.StatusBadRequest)
        return
    }

    _, err = db.Exec("INSERT INTO transactions (service, product_id, quantity, status, worker_event_id) VALUES ('ServiceB', ?, ?, 'try', ?)", productID, quantity, workerEventID)
    if err != nil {
        http.Error(w, "Reserve transaction record failed in Service B", http.StatusInternalServerError)
        return
    }

    fmt.Fprintf(w, "Reserve in Service B successful")
}

func ConfirmHandler(w http.ResponseWriter, r *http.Request) {
    productID, _ := strconv.Atoi(r.URL.Query().Get("product_id"))
    workerEventID, _ := strconv.Atoi(r.URL.Query().Get("worker_event_id"))

    _, err := db.Exec("UPDATE transactions SET status='confirm' WHERE service='ServiceB' AND product_id=? AND worker_event_id=? AND status='try'", productID, workerEventID)
    if err != nil {
        http.Error(w, "Confirm failed in Service B", http.StatusInternalServerError)
        return
    }

    fmt.Fprintf(w, "Confirm in Service B successful")
}

func CancelHandler(w http.ResponseWriter, r *http.Request) {
    productID, _ := strconv.Atoi(r.URL.Query().Get("product_id"))
    quantity, _ := strconv.Atoi(r.URL.Query().Get("quantity"))
    workerEventID, _ := strconv.Atoi(r.URL.Query().Get("worker_event_id"))

    _, err := db.Exec("UPDATE products SET stock = stock + ? WHERE id = ?", quantity, productID)
    if err != nil {
        http.Error(w, "Cancel failed in Service B", http.StatusInternalServerError)
        return
    }

    _, err = db.Exec("UPDATE transactions SET status='compensated' WHERE service='ServiceB' AND product_id=? AND worker_event_id=? AND status IN ('try', 'confirm')", productID, workerEventID)
    if err != nil {
        http.Error(w, "Cancel transaction record failed in Service B", http.StatusInternalServerError)
        return
    }

    fmt.Fprintf(w, "Cancel in Service B successful")
}

TCC Controller(Sageパターン用)

package main

import (
    "database/sql"
    "fmt"
    "net/http"
    "io/ioutil"
    "strconv"
    "time"

    _ "github.com/go-sql-driver/mysql"
)

var db *sql.DB

func main() {
    var err error
    db, err = sql.Open("mysql", "user:password@tcp(127.0.0.1:3306)/sage_example")
    if err != nil {
        panic(err)
    }
    defer db.Close()

    go scheduleCompensation()

    workerEventID, err := createWorkerEvent()
    if err != nil {
        fmt.Println("Failed to create worker event:", err)
        return
    }

    err = ProcessSage(workerEventID, 1, 10)
    if err != nil {
        fmt.Println("Transaction failed:", err)
    } else {
        fmt.Println("Transaction succeeded")
    }
}

func createWorkerEvent() (int, error) {
    res, err := db.Exec("INSERT INTO worker_events (status) VALUES ('pending')")
    if err != nil {
        return 0, err
    }

    workerEventID, err := res.LastInsertId()
    if err != nil {
        return 0, err
    }

    return int(workerEventID), nil
}

func updateWorkerEventStatus(workerEventID int, status, errorMessage string) error {
    _, err := db.Exec("UPDATE worker_events SET status=?, error_message=? WHERE id=?", status, errorMessage, workerEventID)
    return err
}

func ProcessSage(workerEventID, productID, quantity int) error {
    if err := reserveServiceA(workerEventID, productID, quantity); err != nil {
        updateWorkerEventStatus(workerEventID, "failed", fmt.Sprintf("Service A reserve failed: %v", err))
        return err
    }

    if err := reserveServiceB(workerEventID, productID, quantity); err != nil {
        cancelServiceA(workerEventID, productID, quantity)
        updateWorkerEventStatus(workerEventID, "failed", fmt.Sprintf("Service B reserve failed: %v. Compensated Service A.", err))
        return err
    }

    if err := confirmServiceA(workerEventID, productID); err != nil {
        cancelServiceB(workerEventID, productID, quantity)
        cancelServiceA(workerEventID, productID, quantity)
        updateWorkerEventStatus(workerEventID, "failed", fmt.Sprintf("Service A confirm failed: %v. Compensated both services.", err))
        return err
    }

    if err := confirmServiceB(workerEventID, productID); err != nil {
        cancelServiceB(workerEventID, productID, quantity)
        cancelServiceA(workerEventID, productID, quantity)
        updateWorkerEventStatus(workerEventID, "failed", fmt.Sprintf("Service B confirm failed: %v. Compensated both services.", err))
        return err
    }

    updateWorkerEventStatus(workerEventID, "completed", "")
    return nil
}

func reserveServiceA(workerEventID, productID, quantity int) error {
    return callService("http://localhost:8080/reserve", productID, quantity, workerEventID)
}

func reserveServiceB(workerEventID, productID, quantity int) error {
    return callService("http://localhost:8081/reserve", productID, quantity, workerEventID)
}

func confirmServiceA(workerEventID, productID int) error {
    return callService("http://localhost:8080/confirm", productID, 0, workerEventID)
}

func confirmServiceB(workerEventID, productID int) error {
    return callService("http://localhost:8081/confirm", productID, 0, workerEventID)
}

func cancelServiceA(workerEventID, productID, quantity int) error {
    return callService("http://localhost:8080/cancel", productID, quantity, workerEventID)
}

func cancelServiceB(workerEventID, productID, quantity int) error {
    return callService("http://localhost:8081/cancel", productID, quantity, workerEventID)
}

func callService(url string, productID, quantity, workerEventID int) error {
    fullURL := fmt.Sprintf("%s?product_id=%d&quantity=%d&worker_event_id=%d", url, productID, quantity, workerEventID)
    resp, err := http.Get(fullURL)
    if err != nil {
        return err
    }
    defer resp.Body.Close()

    body, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        return err
    }

    fmt.Println(string(body))
    return nil
}

func scheduleCompensation() {
    for {
        time.Sleep(1 * time.Minute)
        processCompensation()
    }
}

func processCompensation() {
    rows, err := db.Query("SELECT id, error_message FROM worker_events WHERE status='failed'")
    if err != nil {
        fmt.Println("Failed to fetch failed events:", err)
        return
    }
    defer rows.Close()

    for rows.Next() {
        var workerEventID int
        var errorMessage string
        err := rows.Scan(&workerEventID, &errorMessage)
        if err != nil {
            fmt.Println("Failed to scan worker event:", err)
            continue
        }
        fmt.Printf("Retrying compensation for worker event %d: %s\n", workerEventID, errorMessage)
        compensateWorkerEvent(workerEventID)
    }
}

func compensateWorkerEvent(workerEventID int) {
    rows, err := db.Query("SELECT service, product_id, quantity, status FROM transactions WHERE worker_event_id=? AND status IN ('try', 'confirm')", workerEventID)
    if err != nil {
        fmt.Println("Failed to fetch transactions for worker event:", err)
        return
    }
    defer rows.Close()

    allCompensated := true

    for rows.Next() {
        var service string
        var productID, quantity int
        var status string
        err := rows.Scan(&service, &productID, &quantity, &status)
        if err != nil {
            fmt.Println("Failed to scan transaction:", err)
            continue
        }

        if status == "try" || status == "confirm" {
            if service == "ServiceA" {
                if err := cancelServiceA(workerEventID, productID, quantity); err != nil {
                    fmt.Printf("Failed to compensate ServiceA %s for worker event %d: %v\n", status, workerEventID, err)
                    allCompensated = false
                }
            } else if service == "ServiceB" {
                if err := cancelServiceB(workerEventID, productID, quantity); err != nil {
                    fmt.Printf("Failed to compensate ServiceB %s for worker event %d: %v\n", status, workerEventID, err)
                    allCompensated = false
                }
            }
        }
    }

    if allCompensated {
        err = updateWorkerEventStatus(workerEventID, "compensated", "")
        if err != nil {
            fmt.Printf("Failed to update worker event %d to compensated: %v\n", workerEventID, err)
        }
    }
}

 

 

 

 

Amazonおすすめ

iPad 9世代 2021年最新作

iPad 9世代出たから買い替え。安いぞ!🐱 初めてならiPad。Kindleを外で見るならiPad mini。ほとんどの人には通常のiPadをおすすめします><

コメントを残す

メールアドレスが公開されることはありません。 * が付いている欄は必須項目です

日本語が含まれない投稿は無視されますのでご注意ください。(スパム対策)