中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

怎么連接Redis服務器

發布時間:2021-12-24 16:24:16 來源:億速云 閱讀:162 作者:iii 欄目:互聯網科技

這篇文章主要介紹“怎么連接Redis服務器”,在日常操作中,相信很多人在怎么連接Redis服務器問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”怎么連接Redis服務器”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!

1、開發與運行環境概述

首先要滿足以下環境要求:

  • Docker: Docker已經成為新應用開發的必備工具,它使得應用的構建、分享與部署都極其簡單。

  • Docker Compose:我們使用Docker Compose來管理所有的服務,以便輕松地進行擴展。

其他的需求都由Docker鏡像來滿足,我們不需要安裝其他任何東西了,只需要寫一個簡單的Docker Compos配置文檔 —— docker-compose.yml:

version: '3'
services:
  ganache:
    image: trufflesuite/ganache-cli
    command: -m
  redis:
    image: redis:alpine
    ports:
      - "6379:6379"
    command: redis-server --appendonly yes
    volumes:
      - redis:/data
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
      KAFKA_CREATE_TOPICS: "command:1:1,address.created:1:1,transaction:1:1,errors:1:1"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

volumes:
  redis:

只要運行docker-compose up -d就可以輕松地啟動服務,這個命令會自動從Docker中心下載必要的鏡像,然后啟動。下面讓我們看看都有哪些服務。

1.1 Ganache-cli

如果沒有接入以太坊區塊鏈的節點,我們的錢包服務就不會有什么用。在開發期我們不需要下載整個以太坊區塊鏈,因此只要使用Ganache仿真器即可。使用Ganache的好處是開發效率高,因為出塊極快。不過在生產環境中就需要使用像Geth這樣的節點軟件來接入以太坊主網了。

1.2 Redis

我們需要數據庫來保存我們創建的地址,并且監聽這些地址相關的交易。Redis是一個很出色的內存鍵/值數據庫,非常適合我們的應用場景。

在這個教程中,我們將使用Redis數據庫來保存我們為地址生成的私鑰,但是在生產服務器上應當使用更安全的硬件設施來保護這些私鑰。

1.3 Kafka/Zookeeper

Apache Kafka在交易所架構中扮演著核心的角色,它負責接收所有服務的消息并分發給訂閱這些消息的節點。

對于以太坊錢包服務而言,我們將使用以下這些主題進行通信:

  • command

  • address.created

  • transaction

  • errors

Apache Kafka服務器可以獨立地進行擴展,為我們的服務提供了一個分布式的消息處理集群。

2、開發語言選擇

就我個人而言,是非常喜歡Elixir的,因為可以用它寫出極其可靠的分布式應用,而且代碼也很容易理解和維護。但是考慮到以太坊的生態,Elixir就沒有什么優勢了。

對于以太坊開發而言最好的選擇還是使用Node.js/Javascript。因為有很多你可以直接就用的組件。因此我們的以太坊錢包服務最終決定使用Node.js開發。

3、初始開發環境搭建

首先運行npm init命令來創建默認的node包:

~/exchange-hubwiz/eth-wallet$ npm init

然后我們可以添加一些錢包服務要用到的node依賴包,執行如下命令:

~/exhcange-hubwiz/eth-wallet$ npm install --save web3 redis kafka-node ethereumjs-tx bluebird

前三個依賴包的作用容易理解:

  • web3:通過websocket連接到Ganache或其他以太坊節點

  • redis:連接到Redis服務器以便保存或提取數據

  • kafka-node:接入Zookeeper,獲取Kafka訪問端結點,生產或消費Kafka消息

最后的兩個依賴包有助于讓我們的代碼更容易理解,并且可以利用async/await的異步編程模式的優勢。

接下來,我們將利用這些node包連接Redis、以太坊和Kafka服務器。

4、連接服務器

4.1 連接Redis服務器

連接Redis非常簡單,創建一個redis.js文件,然后編寫如下代碼:

// load configuration
const config = require('../../config')
const redis = require('redis')
const bluebird = require('bluebird')

// promisify the redis client using bluebird
bluebird.promisifyAll(redis.RedisClient.prototype);
bluebird.promisifyAll(redis.Multi.prototype);

// create a new redis client and connect to the redis instance
const client = redis.createClient(config.redis_port, config.redis_host);

// if an error occurs, print it to the console
client.on('error', function (err) {
  console.error("[REDIS] Error encountered", err)
})

module.exports = client;

4.2 連接以太坊節點

如果你認為連接Redis很簡單了,那么使用web3連接以太坊節點簡單的會讓你吃驚。 創建一個ethereum.js,然后編寫如下代碼:

const config = require('../../config')
const Web3 = require('web3')
module.exports = new Web3(config.uri)

4.3 連接Kafka服務器

Kafka,需要從隊列中提取消息進行消費,或者生產消息存入隊列。因此我們也需要繼續相關的配置。

創建一個新的文件query.js,然后編寫如下的代碼:

const kafka = require('kafka-node')
const config = require('../../config')

// configure how the consumers should connect to the broker/servers
// each consumer creates his own connecto to a broker

const default_options = {
  host: config.kafka_zookeeper_uri,
  autoCommit: true,
  fromOffset: 'earliest',
}

module.exports.consumer = (group_id = "ethereum_wallet_manager_consumer", topics = [], opts = {}) => {
  const options = Object.assign({ groupId: group_id }, default_options, opts)
  const consumer = new kafka.ConsumerGroup(options, topics)
  return consumer
}

// configure how the producer connects to the Apache Kafka broker

// initiate the connection to the kafka client
const client = new kafka.Client(config.kafka_zookeeper_uri, config.kafka_client_id)
module.exports.client = client
const producer = new kafka.Producer(client)

// add a listener to the ready event
async function on_ready(cb) {
  producer.on('ready', cb)
}

// define a method to send multiple messages to the given topic
// this will return a promise that will resolve with the response from Kafka
// messages are converted to JSON strings before they are added in the queue
async function send(topic, messages) {
  return new Promise((resolve, reject) => {
    // convert objects to JSON strings
    messages = messages.map(JSON.stringify)
    // add the messages to the given topic
    producer.send([{ topic, messages}], function (err, data) {
      if (err) return reject(err)
      resolve(data)
    })
  })
}

// expose only these methods to the rest of the application and abstract away
// the implementation of the producer to easily change it later
module.exports.on_ready = on_ready
module.exports.send = send

5、打造以太坊錢包服務

現在我們開始進入以太坊錢包服務的核心特性開發階段。

5.1 創建新的以太坊賬戶

交易所和支付網關需要為客戶生成新地址,以便用戶可以向服務充值,或者為產品付費。生成一個沒有用過的以太坊地址是任何虛擬貨幣服務的基本需求,因此讓我們看看如何實現。

首先,創建一個commands.js,在其中我們訂閱隊列中的消息。主要包括以下幾個步驟:

  • 連接到command主題,監聽新的create_account命令

  • 當收到新的create_account命令時,創建新的密鑰對并存入密碼庫

  • 生成account_created消息并發送到隊列的account_created主題

代碼如下:

const web3 = require("./ethereum")
const redis = require('./redis')
const queue = require('./queue')

/**
 * Listen to new commands from the queue
 */
async function listen_to_commands() {
  const queue_consumer = queue.consumer('eth.wallet.manager.commands', ['command'])
  // process messages
  queue_consumer.on('message', async function (topic_message) {
    try {
      const message = JSON.parse(topic_message.value)
      // create the new address with some reply metadata to match the response to the request
      const resp = await create_address(message.meta)
      // if successful then post the response to the queue
      if (resp) {
        await queue_producer.send('address.created', [resp])
      }
    } catch (err) {
      // in case something goes wrong catch the error and send it back in the 'errors' topic
      console.error(topic_message, err)
      queue_producer.send('errors', [{type: 'command', request: topic_message, error_code: err.code, error_message: err.message, error_stack: err.stack}])
    }
  })
  return queue_consumer
}

/**
 * Create a new ethereum address and return the address 
 */
async function create_account(meta = {}) {
  // generate the address
  const account = await web3.eth.accounts.create()
  
  // disable checksum when storing the address
  const address = account.address.toLowerCase()
  
  // save the public address in Redis without any transactions received yet
  await redis.setAsync(`eth:address:public:${address}`, JSON.stringify({}))
  
  // Store the private key in a vault.
  // For demo purposes we use the same Redis instance, but this should be changed in production
  await redis.setAsync(`eth:address:private:${address}`, account.privateKey)
  
  return Object.assign({}, meta, {address: account.address})
}

module.exports.listen_to_commands = listen_to_commands

5.2 處理新交易

我們的錢包還沒寫完,當我們創建的地址收到用戶充值時應當得到通知才對。為此,以太坊的web3客戶端提供了newBlockHeaders訂閱機制。此外,如果我們的服務偶然宕機,那么服務就會錯過在宕機期間生產的區塊,因此我們還需要檢查錢包是否已經同步到了網絡的最新區塊。

創建 sync_blocks.js文件,編寫如下代碼:

const web3 = require('./ethereum')

/**
 * Sync blocks and start listening for new blocks
 * @param {Number} current_block_number - The last block processed
 * @param {Object} opts - A list of options with callbacks for events
 */
async function sync_blocks(current_block_number, opts) {
  // first sync the wallet to the latest block
  let latest_block_number = await web3.eth.getBlockNumber()
  let synced_block_number = await sync_to_block(current_block_number, latest_block_number, opts)

  // subscribe to new blocks
  web3.eth.subscribe('newBlockHeaders', (error, result) => error && console.log(error))
  .on("data", async function(blockHeader) {
    return await process_block(blockHeader.number, opts)
  })

  return synced_block_number
}

// Load all data about the given block and call the callbacks if defined
async function process_block(block_hash_or_id, opts) {
  // load block information by id or hash
  const block = await web3.eth.getBlock(block_hash_or_id, true)
  // call the onTransactions callback if defined
  opts.onTransactions ? opts.onTransactions(block.transactions) : null;
  // call the onBlock callback if defined
  opts.onBlock ? opts.onBlock(block_hash_or_id) : null;
  return block
}

// Traverse all unprocessed blocks between the current index and the lastest block number
async function sync_to_block(index, latest, opts) {
  if (index >= latest) {
    return index;
  }
  await process_block(index + 1, opts)
  return await sync_to_block(index + 1, latest, opts)
}

module.exports = sync_blocks

在上面的代碼中,我們從錢包服務之前處理的最新區塊開始,一直同步到區塊鏈的當前最新區塊。一旦我們同步到最新區塊,就開始訂閱新區塊事件。對于每一個區塊,我們都執行如下的回調函數以處理區塊頭以及區塊中的交易列表:

  • onTransactions

  • onBlock

通常包含如下的處理步驟:

  • 監聽新區塊,獲取區塊中的全部交易

  • 過濾掉與錢包地址無關的交易

  • 將每個相關的交易都發往隊列

  • 將地址上的資金歸集到安全的存儲

  • 更新已處理的區塊編號

最終的代碼如下:

const web3 = require("web3")
const redis = require('./redis')
const queue = require('./queue')
const sync_blocks = require('./sync_blocks')

/**
 * Start syncing blocks and listen for new transactions on the blockchain
 */
async function start_syncing_blocks() {
  // start from the last block number processed or 0 (you can use the current block before deploying for the first time)
  let last_block_number = await redis.getAsync('eth:last-block')
  last_block_number = last_block_number || 0
  // start syncing blocks
  sync_blocks(last_block_number, {
    // for every new block update the latest block value in redis
    onBlock: update_block_head,
    // for new transactions check each transaction and see if it's new
    onTransactions: async (transactions) => {
      for (let i in transactions) {
        await process_transaction(transactions[i])
      }
    }
  })
}

// save the lastest block on redis
async function update_block_head(head) {
  return await redis.setAsync('eth:last-block', head)
}

// process a new transaction
async function process_transaction(transaction) {
  const address = transaction.to.toLowerCase()
  const amount_in_ether = web3.utils.fromWei(transaction.value)

  // check if the receiving address has been generated by our wallet
  const watched_address = await redis.existsAsync(`eth:address:public:${address}`)
  if (watched_address !== 1) {
    return false
  }

  // then check if it's a new transaction that should be taken into account
  const transaction_exists = await redis.existsAsync(`eth:address:public:${address}`)
  if (transaction_exists === 1) {
    return false
  }

  // update the list of transactions for that address
  const data = await redis.getAsync(`eth:address:public:${address}`)
  let addr_data = JSON.parse(data)
  addr_data[transaction.hash] = {
    value: amount_in_ether
  }

  await redis.setAsync(`eth:address:public:${address}`, JSON.stringify(addr_data))
  await redis.setAsync(`eth:transaction:${transaction.hash}`, transaction)
  
  // move funds to the cold wallet address
  // const cold_txid = await move_to_cold_storage(address, amount_in_ether)
  
  // send notification to the kafka server
  await queue_producer.send('transaction', [{
    txid: transaction.hash,
    value: amount_in_ether,
    to: transaction.to,
    from: transaction.from,
    //cold_txid: cold_txid,
  }])

  return true
}

module.exports = start_syncing_blocks

到此,關于“怎么連接Redis服務器”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

厦门市| 牡丹江市| 安宁市| 田林县| 乐平市| 松滋市| 西畴县| 大丰市| 临湘市| 从化市| 徐州市| 仲巴县| 昌江| 巨野县| 新泰市| 舟山市| 西林县| 海安县| 松滋市| 民权县| 晋宁县| 龙陵县| 隆回县| 永定县| 临泉县| 合江县| 玉门市| 柘荣县| 桦川县| 东宁县| 尚义县| 荔浦县| SHOW| 米林县| 阳城县| 龙州县| 贺兰县| 邹城市| 进贤县| 民权县| 桂林市|