subgraphs > Querying > 分布式系统

分布式系统

Reading time: 7 min

Graph 是分布式系统实现的协议。

连接可能失败。 请求无序到达。 具有不同步时钟和状态的很多计算机在处理相关请求。 服务器重新启动。 重组发生在请求之间。以上这些问题是所有分布式系统所固有的,但在全球范围内运行的系统中会更加严重。

考虑这个例子,如果客户端在重组期间轮询索引人以获取最新数据,可能会发生什么。

  1. 索引人获取区块 8
  2. 区块 8 的数据传送给客户端
  3. 索引人获取区块 9
  4. 索引人获取区块 10A
  5. 区块 10A 的数据传送给客户端
  6. 索引人检测到区块重组到 10B 并回转到 10A
  7. 区块 9 的数据传送给客户端
  8. 索引人获取区块 10B
  9. 索引人获取区块 11
  10. 区块 11 的数据传送给客户端

从索引人的角度来看,事情进展得完全合乎逻辑。 时间不断前行,尽管我们确实必须回滚一个相邻区块,并在共识下处理该区块。 在此过程中,索引人使用它当时知道的最新状态为请求提供服务。

然而,从客户的角度来看,事情却显得很混乱。 客户端观察到响应依次是区块 8、10、9 和 11。 我们称之为“区块摆动”问题。 当客户端遇到区块摆动时,数据可能会随着时间的推移而自相矛盾。 当考虑到索引人不会同时获取最新区块,并且您的请求可能会被路由到多个索引人的时候,情况会变得更糟。

客户端和服务器有责任协同工作,以便向用户提供一致的数据。 根据所需的一致性目标的不同,我们必须使用不同的方法,因为没有一个适合所有问题的完美解决方案。

通过分布式系统的影响进行推理是困难的,但修复可能不是! 我们已经建立了一些 API 和模式来帮助您解决一些常见的用例。 以下示例说明了这些模式,但仍然省略了生产代码所需的细节(如错误处理和消除),以免混淆主要思想。

轮询更新的数据

链到本节

Graph 提供 block: { number_gte: $minBlock } API,确保响应是针对等于或高于 $minBlock 的单个区块。 如果向 graph-node 实例发出请求并且最小区块尚未同步,则 graph-node 将返回错误。 如果 graph-node 已同步最小区块,它将返回最新区块的响应。 如果请求是发给 Edge & Node 网关的,网关将过滤掉任何尚未同步最小区块的索引人,并请求索引人已同步的最新区块。

我们可以使用 number_gte, 从而确保在循环中轮询数据时,时间不会倒流。 这是一个例子:

/// Updates the protocol.paused variable to the latest
/// known value in a loop by fetching it using The Graph.
async function updateProtocolPaused() {
// It's ok to start with minBlock at 0. The query will be served
// using the latest block available. Setting minBlock to 0 is the
// same as leaving out that argument.
let minBlock = 0
for (;;) {
// Schedule a promise that will be ready once
// the next Ethereum block will likely be available.
const nextBlock = new Promise((f) => {
setTimeout(f, 14000)
})
const query = `
query GetProtocol($minBlock: Int!) {
protocol(block: { number_gte: $minBlock } id: "0") {
paused
}
_meta {
block {
number
}
}
}`
const variables = { minBlock }
const response = await graphql(query, variables)
minBlock = response._meta.block.number
// TODO: Do something with the response data here instead of logging it.
console.log(response.protocol.paused)
// Sleep to wait for the next block
await nextBlock
}
}

获取一组相关项目

链到本节

另一个用例是检索一个更大的集合,或者更一般地说,跨多个请求检索相关项目。 与轮询案例(所需的一致性是及时向前进行)不同,此用例所需的一致性是针对单个时间点的。

在这里,我们将使用 block: { hash: $blockHash } 参数将我们所有的结果锚定到同一个区块。

/// Gets a list of domain names from a single block using pagination
async function getDomainNames() {
// Set a cap on the maximum number of items to pull.
let pages = 5
const perPage = 1000
// The first query will get the first page of results and also get the block
// hash so that the remainder of the queries are consistent with the first.
const listDomainsQuery = `
query ListDomains($perPage: Int!) {
domains(first: $perPage) {
name
id
}
_meta {
block {
hash
}
}
}`
let data = await graphql(listDomainsQuery, { perPage })
let result = data.domains.map((d) => d.name)
let blockHash = data._meta.block.hash
let query
// Continue fetching additional pages until either we run into the limit of
// 5 pages total (specified above) or we know we have reached the last page
// because the page has fewer entities than a full page.
while (data.domains.length == perPage && --pages) {
let lastID = data.domains[data.domains.length - 1].id
query = `
query ListDomains($perPage: Int!, $lastID: ID!, $blockHash: Bytes!) {
domains(first: $perPage, where: { id_gt: $lastID }, block: { hash: $blockHash }) {
name
id
}
}`
data = await graphql(query, { perPage, lastID, blockHash })
// Accumulate domain names into the result
for (domain of data.domains) {
result.push(domain.name)
}
}
return result
}

请注意,在重组的情况下,客户端将需要从第一个请求重试,以将区块hash更新为非相邻区块。

编辑

上页
从应用程序中进行查询
下页
GraphQL API
编辑