iT邦幫忙

2021 iThome 鐵人賽

DAY 17
0
Software Development

MYSQL-相關實務操作學習紀錄系列 第 17

Day.17 應用中學習 - 實務操作資料庫寫入 ( golang / sql )

  • 分享至 

  • xImage
  •  

當我們要確保資料是否有成功insert,除了使用程式邏輯上的Lock控制還會搭配到使用事務執行流程去管控。

透過以下應用舉例,達到控制寫入資料&事務流程設計。 過程註解寫在代碼中/images/emoticon/emoticon33.gif

  • 程式語言

Golang

  • 流程模擬

假設有5筆資料要做寫入,流程上如何達到(寫語法)與(執行語法)錯開的狀況,並透過事務流程完成提交。

  • 流程設計

組語法部分 : 每5s取一筆數據去組insert into .... values (...)的values資料。
執行寫入DB部分: 每10s觸發一次提交數據。

先看程式執行完成LOG:

https://ithelp.ithome.com.tw/upload/images/20210913/20130880AkiZ367mqE.png


昨天先建好的資料:

mysql> select * from user_lists;
+---------+---------+-------+-----------------+-------------+
| user_id | account | level | last_login_time | create_time |
+---------+---------+-------+-----------------+-------------+
|  100001 | siang05 |     1 |      1630385990 |  1330385990 |
|  100002 | siang01 |     1 |      1630385990 |  1330385990 |
|  100003 | siang02 |     1 |      1630385990 |  1330385990 |
|  100004 | siang03 |     1 |      1630385990 |  1330385990 |
|  100005 | siang04 |     1 |      1630385990 |  1330385990 |
+---------+---------+-------+-----------------+-------------+
5 rows in set (0.00 sec)

  • 溫馨提醒:
    部分程式邏輯在於個人設計,為了方便呈現。在實際運用上要做修改喔/images/emoticon/emoticon31.gif
    ps. 複製貼上比較好觀看
package main

import (
	"database/sql"
	"fmt"
	"sync"
	"time"

	_ "github.com/go-sql-driver/mysql"
)

func main() {

	//Init connect object.  content-> account:password@tcp(host_ip:port)/db_name
	dbu, err := sql.Open("mysql", "root:1234@tcp(127.0.0.1:3306)/user?charset=utf8&parseTime=True")
	if err != nil {
		fmt.Println("open mysql error", err)
		return
	}

	//close conn.
	defer dbu.Close()

	//Create mysql connect.
	err = dbu.Ping()
	if err != nil {
		fmt.Println("create mysql connect  error", err)
		return
	}

	/*
	  >以下連接池設定
	      注意點:
	   (1.)mysql連線預設保存時間為8小時,閒置超過8小時會被mysql斷開變失效連線。
	    查詢: show variables like '%wait_timeout%';

	    Q: 使用到被mysql斷開的連線會產生ERROR -> packets.go:36: unexpected EOF

	   (2.)mysql最大連線數。
	    查詢: show variables like '%max_connections%';

	*/

	//設置最大併發連線數,超過連線數需等待,直到其中連接被釋放並變為空閒。
	dbu.SetMaxOpenConns(100)
	//設置最大的空閒連接數,適當的設定空閒連接數(會佔用內存)將提高性能,減少從頭建立新連接的可能。
	dbu.SetMaxIdleConns(10)
	//設置連線的生命週期,過期後無法重用。
	dbu.SetConnMaxLifetime(30)

	/*
		以下內容單純舉例大致流程

		重點: 資料寫入的流程 or 如果寫入發生中斷造成失敗要怎麼處理後續步驟~ 在於個人設計了...舉例上沒有多做這部分邏輯處理。

		流程上模擬: 今天有5筆資料要做寫入,流程上如何達到(寫語法)與(執行語法)錯開的狀況,並透過事務流程完成提交。

		在整個流程上 => 組語法部分    : 每5s取一筆數據組insert into .... values (...) 的values資料。
		              執行寫入DB部分: 每10s觸發一次提交數據。

	*/

	//假設有5筆資料要做寫入 (寫入資料來源)
	chanBuf := make(chan string, 5)
	chanBuf <- fmt.Sprintf("(%d,'%s',%d,%d,%d),", 100002, "siang01", 1, 1630385990, 1330385990)
	chanBuf <- fmt.Sprintf("(%d,'%s',%d,%d,%d),", 100003, "siang02", 1, 1630385990, 1330385990)
	chanBuf <- fmt.Sprintf("(%d,'%s',%d,%d,%d),", 100004, "siang03", 1, 1630385990, 1330385990)
	chanBuf <- fmt.Sprintf("(%d,'%s',%d,%d,%d),", 100005, "siang04", 1, 1630385990, 1330385990)
	chanBuf <- fmt.Sprintf("(%d,'%s',%d,%d,%d),", 100001, "siang05", 1, 1630385990, 1330385990)

	fmt.Printf("目前等待寫入筆數: %d\n", len(chanBuf))

	go GetSqlData(chanBuf)
	go ExecTransaction(dbu)

	//阻塞主線程防提早執行完,確認5筆資料都完成才退出。
	for {
		if checker == 5 {
			fmt.Printf("All finish")
			break
		}
	}
}

var locks sync.Mutex
var values string //組語法值的部分
var checker int64 //檢查筆數

//GetSqlData 組合寫入資料內容
func GetSqlData(data chan string) {
	for {

		//為了能看出流程怎麼走才加的
		time.Sleep(5 * time.Second)

		//讀取data
		info := <-data

		//防止定時執行寫入機制觸發時還在組寫入資料
		locks.Lock()
		values += info
		locks.Unlock()
	}
}

//RollbackTX 回滾事務釋放連線
func RollbackTX(tx *sql.Tx) error {
	err := tx.Rollback()
	if err != sql.ErrTxDone && err != nil {
		fmt.Println("[ERROR] tx rollback error", err)
		return err
	}
	return nil
}

//ExecTransaction 執行事務提交流程
func ExecTransaction(dbu *sql.DB) {

	//INSERT組資料的來源看個人設計, 這邊locks機制是為了確保組合SQL的部分不會再有資料寫入
	insert := "INSERT INTO user_lists(user_id,account,level,last_login_time,create_time) values"

	//設置計時器每10秒觸發一次寫入流程
	ticker := time.NewTicker(10 * time.Second)
	for range ticker.C {

		if values != "" {

			//處理語法字串的最後一個逗點變成結束符號(, -> ;)
			if values[len(values)-1:] == "," {
				values = values[0:len(values)-1] + ";"
			}

			//完整寫入語法
			sql := insert + values
			fmt.Println("[READY] 本次執行語法: ", sql)

			fmt.Println("[START] 進入Transaction 流程")

			//lock -> 禁止values繼續寫入新資料
			locks.Lock()

			//進入交易模式
			fmt.Println("[START] 進入Begin 流程")

			tx, beginErr := dbu.Begin()
			if beginErr != nil {

				if errs := RollbackTX(tx); errs != nil {
					fmt.Println("[ERROR] Begin rollback error: ", errs)
				}
				fmt.Println("[ERROR] Begin error: ", beginErr)
				values = ""
				locks.Unlock()
				continue
			}

			//執行insert動作
			fmt.Println("[START] 進入Exec SQL 流程")

			rows, execErr := tx.Exec(sql)
			if execErr != nil {

				if errs := RollbackTX(tx); errs != nil {
					fmt.Println("[ERROR] Exec rollback error: ", errs)
				}
				fmt.Println("[ERROR] Exec error: ", execErr)
				values = ""
				locks.Unlock()
				continue
			}

			//提交事務
			fmt.Println("[START] 進入Commit 流程")

			if CommitErr := tx.Commit(); CommitErr != nil {

				if errs := RollbackTX(tx); errs != nil {
					fmt.Println("[ERROR] Commit rollback error: ", errs)
				}
				fmt.Println("[ERROR] Commit error: ", CommitErr)
				values = ""
				locks.Unlock()
				continue
			}

			//檢查筆數目前已經到幾筆了(返回筆數)
			check, _ := rows.RowsAffected()
			fmt.Printf("[LOG] 目前完成筆數: %d\n", checker+check)

			checker += check

			//將寫入成功的數據清除
			values = ""
			//解鎖-> values可以繼續寫入
			locks.Unlock()

			fmt.Println("[OK] 完成本次作業")
		}
	}
}

明天來介紹mysql重要的索引 !! /images/emoticon/emoticon37.gif


上一篇
Day.16 應用中學習- 資料庫操作 ( golang / sql )
下一篇
Day.18 InnoDB資料儲存 - 主索引架構 (Clustered Index)
系列文
MYSQL-相關實務操作學習紀錄30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言