zankv 分析
目录介绍
├── Godeps godep文件
├── Makefile Makefile文件
├── README.md ReadMe文件
├── apps 应用目录
│ ├── placedriver
│ │ └── main.go
│ └── zankv zankv主目录
│ ├── main.go
├── build-pb.sh pb编译脚本
├── cluster 集群相关
│ ├── common.go
│ ├── coordinator_stats.go
│ ├── datanode_coord 数据协调
│ │ └── data_node_coordinator.go
│ ├── pdnode_coord
│ │ ├── pd_api.go
│ │ ├── pd_coordinator.go
│ │ └── place_driver.go
│ ├── register.go
│ ├── register_etcd.go
│ └── util.go
├── common 公共目录
│ ├── api_request.go
│ ├── api_response.go
│ ├── binary.go
│ ├── file_sync.go
│ ├── file_sync_test.go
│ ├── listener.go
│ ├── logger.go
│ ├── stats.go
│ ├── type.go
│ └── util.go
├── default.conf 默认配置
├── default2.conf 默认配置
├── dist.sh
├── node node相关
│ ├── config.go
│ ├── hash.go
│ ├── keys.go
│ ├── kvstore.go
│ ├── list.go
│ ├── multi.go
│ ├── namespace.go
│ ├── node.go
│ ├── raft.go
│ ├── raft_internal.pb.go
│ ├── raft_internal.proto
│ ├── raft_storage.go
│ ├── raft_test.go
│ ├── scan.go
│ ├── set.go
│ ├── util.go
│ └── zset.go
├── pdserver
│ ├── config.go
│ ├── http.go
│ ├── pdconf.example
│ └── server.go
├── pre-dist.sh
├── put_test.sh
├── raft raft相关
│ ├── README.md
│ ├── design.md
│ ├── diff_test.go
│ ├── doc.go
│ ├── example_test.go
│ ├── log.go
│ ├── log_test.go
│ ├── log_unstable.go
│ ├── log_unstable_test.go
│ ├── logger.go
│ ├── node.go
│ ├── node_bench_test.go
│ ├── node_test.go
│ ├── progress.go
│ ├── progress_test.go
│ ├── raft.go
│ ├── raft_flow_control_test.go
│ ├── raft_paper_test.go
│ ├── raft_snap_test.go
│ ├── raft_test.go
│ ├── raftpb
│ │ ├── raft.pb.go
│ │ └── raft.proto
│ ├── rawnode.go
│ ├── rawnode_test.go
│ ├── read_only.go
│ ├── status.go
│ ├── storage.go
│ ├── storage_test.go
│ ├── util.go
│ └── util_test.go
├── rockredis rocksdb实现的redis
│ ├── const.go
│ ├── doc.go
│ ├── encode.go
│ ├── encode_darwin.go
│ ├── iterator.go
│ ├── rockredis.go
│ ├── rockredis_test.go
│ ├── scan.go
│ ├── t_hash.go
│ ├── t_hash_test.go
│ ├── t_kv.go
│ ├── t_kv_test.go
│ ├── t_list.go
│ ├── t_list_test.go
│ ├── t_set.go
│ ├── t_set_test.go
│ ├── t_table.go
│ ├── t_zset.go
│ ├── t_zset_test.go
│ └── util.go
├── rsyncd.conf
├── server server
│ ├── config.go
│ ├── httpapi.go
│ ├── merge.go
│ ├── redis_api.go
│ ├── redis_api_test.go
│ ├── server.go
│ ├── server_test.go
│ └── util.go
├── snap snap
│ ├── db.go
│ ├── message.go
│ ├── metrics.go
│ ├── snappb
│ │ ├── snap.pb.go
│ │ └── snap.proto
│ ├── snapshotter.go
│ └── snapshotter_test.go
├── start_cluster.sh
├── stats 状态
│ ├── leader.go
│ ├── queue.go
│ ├── server.go
│ └── stats.go
├── tools 工具
│ └── bench
│ └── main.go
├── transport raft协议
│ └── rafthttp
│ ├── coder.go
│ ├── doc.go
│ ├── fake_roundtripper_test.go
│ ├── functional_test.go
│ ├── http.go
│ ├── http_test.go
│ ├── metrics.go
│ ├── msg_codec.go
│ ├── msg_codec_test.go
│ ├── msgappv2_codec.go
│ ├── msgappv2_codec_test.go
│ ├── peer.go
│ ├── peer_status.go
│ ├── peer_test.go
│ ├── pipeline.go
│ ├── pipeline_test.go
│ ├── probing_status.go
│ ├── remote.go
│ ├── snapshot_sender.go
│ ├── snapshot_test.go
│ ├── stream.go
│ ├── stream_test.go
│ ├── transport.go
│ ├── transport_bench_test.go
│ ├── transport_test.go
│ ├── urlpick.go
│ ├── urlpick_test.go
│ ├── util.go
│ └── util_test.go
├── wal wal
│ ├── decoder.go
│ ├── doc.go
│ ├── encoder.go
│ ├── file_pipeline.go
│ ├── metrics.go
│ ├── record_test.go
│ ├── repair.go
│ ├── repair_test.go
│ ├── util.go
│ ├── wal.go
│ ├── wal_bench_test.go
│ ├── wal_test.go
│ ├── wal_unix.go
│ ├── wal_windows.go
│ └── walpb
│ ├── record.go
│ ├── record.pb.go
│ └── record.proto
└── yz_cp_simple
启动流程分析
zankv启动入口在 apps/zankv/main.go里.其中:program 实现了Service接口
Service接口定义如下:
type Service interface {
// Init is called before the program/service is started and after it's
// determined if the program is running as a Windows Service.
Init(Environment) error
// Start is called after Init. This method must be non-blocking.
Start() error
// Stop is called in response to syscall.SIGINT, syscall.SIGTERM, or when a
// Windows Service is stopped.
Stop() error
}
main函数中调用了svc.Run函数,Run函数中会分别调用Service接口的Init, Start, Stop函数。
对于zankv来说,主要启动逻辑在main.go中的Start()函数。
main.go#program::Start
Start函数首先读取配置文件,配置结构体如下:
type ServerConfig struct {
// this cluster id is used for server transport to tell
// different global cluster
ClusterID string `json:"cluster_id"`
EtcdClusterAddresses string `json:"etcd_cluster_addresses"`
BroadcastInterface string `json:"broadcast_interface"`
BroadcastAddr string `json:"broadcast_addr"`
RedisAPIPort int `json:"redis_api_port"`
HttpAPIPort int `json:"http_api_port"`
ProfilePort int `json:"profile_port"`
DataDir string `json:"data_dir"`
DataRsyncModule string `json:"data_rsync_module"`
LocalRaftAddr string `json:"local_raft_addr"`
Tags []string `json:"tags"`
ElectionTick int `json:"election_tick"`
TickMs int `json:"tick_ms"`
// default rocksdb options, can be override by namespace config
RocksDBOpts rockredis.RockOptions `json:"rocksdb_opts"`
Namespaces []NamespaceNodeConfig `json:"namespaces"`
MaxScanJob int `json:"max_scan_job"`
}
type NamespaceNodeConfig struct {
Name string `json:"name"`
LocalReplicaID uint64 `json:"local_replica_id"`
}
type ConfigFile struct {
ServerConf ServerConfig `json:"server_conf"`
}
根据配置,创建新的Server:
app := server.NewServer(serverConf)
如果配置文件中配置了namespace,那么根据配置来读取namespace,并初始化。
根据配置文件中namespace中的Name来读取namespace内容,读取namespace的内容后,调用
app.InitKVNamespace
启动server:
app.Start()
server/server.go#Server::Start
该函数,首先调用了rafthttp.Transport::Start来启动Raft,然后调用serveRaft. 根据配置(EtcdClusterAddresses是否为"",来创建数据协调器)来判断是启动数据协调器还是启动node的namespaceManager
if self.dataCoord != nil {
err := self.dataCoord.Start()
if err != nil {
sLog.Fatalf("data coordinator start failed: %v", err)
}
} else {
self.nsMgr.Start()
}
接着启动Redis协议服务器,以及http协议服务器
go func() {
defer self.wg.Done()
self.serveRedisAPI(self.conf.RedisAPIPort, self.stopC)
}()
go func() {
defer self.wg.Done()
self.serveHttpAPI(self.conf.HttpAPIPort, self.stopC)
}()