Python之psutil库使用

Python psutil库的介绍:

  • 实际上psutil就是为了监控而生的库.因为其强大在于多平台兼容(Unix, Windows下都可以使用)
  • 能够监控CPU、内存、硬盘、进程等等.

开头唠叨两句

在使用中有任何问题,可以反馈给我,以下联系方式跟我交流

  • Author: Leo
  • Wechat: Leo-sunhailin
  • QQ: 379978424
  • E-mail: 379978424@qq.com

介绍

  • 开发介绍:

    • psutil库是一个能够查看系统信息的库,可以用来做监控(运维用的最多)

    • psutil模块获取系统的基础信息,包括cpu、内存、磁盘io和网络io和查看系统进程信息

    • 其中进程的方法中有psutil.Process()和psutil.Popen()两个方法。

  • 后期更新:

    • 加入前端页面管理(vue + element-ui)
    • Linux的代码
    • 多终端查询

开发环境

  • 系统: Win10 x64

  • Python版本: 3.4.4

    • psutil: 5.4.1

      1
      2
      # 安装
      pip install psutil

使用说明

  • 使用方法
1
2
3
4
# 命令行
python <你命名的py名称>.py -l zh(或者是en)

# -l是选择语言,-h是帮助文档

代码

  • 直接po代码了,目前只写了windows下的,linux的晚点在补充上来

  • Windows

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
# -*- coding: UTF-8 -*-
"""
Created on 2017年11月29日
@author: Leo
"""

# 内部库
import sys
import json
import time
from collections import OrderedDict

# 第三方库
import psutil


# 系统信息查询
class SystemChecker:
def __init__(self, lang):
# 内存字典字段
self.memory_dict_en = OrderedDict()
self.memory_dict_zh = OrderedDict()

# CPU信息字段
self.cpu_dict_en = OrderedDict()
self.cpu_dict_zh = OrderedDict()

# 用户信息
self.user_info_en = OrderedDict()
self.user_info_zh = OrderedDict()

# 进程信息
self.process_info_en = OrderedDict()
self.process_info_zh = OrderedDict()

# 获取当前内存大小
self.memory_result = psutil.virtual_memory()

# 输出语言
self.language = lang

'''
工具方法
'''
# 转换时间戳
@staticmethod
def change_timestamp(timestamp):
time_local = time.localtime(int(timestamp))
return str(time.strftime("%Y-%m-%d %H:%M:%S", time_local))

# 转换容量单位
@staticmethod
def bytes_2_human_readable(number_of_bytes):
"""
转换单位(上限是TB)
:param number_of_bytes:
:return: 返回大小 + 单位(字符串)
"""
if number_of_bytes < 0:
raise ValueError("!!! numberOfBytes can't be smaller than 0 !!!")

step_to_greater_unit = 1024.

number_of_bytes = float(number_of_bytes)
unit = 'bytes'

if (number_of_bytes / step_to_greater_unit) >= 1:
number_of_bytes /= step_to_greater_unit
unit = 'KB'

if (number_of_bytes / step_to_greater_unit) >= 1:
number_of_bytes /= step_to_greater_unit
unit = 'MB'

if (number_of_bytes / step_to_greater_unit) >= 1:
number_of_bytes /= step_to_greater_unit
unit = 'GB'

if (number_of_bytes / step_to_greater_unit) >= 1:
number_of_bytes /= step_to_greater_unit
unit = 'TB'

precision = 1
number_of_bytes = round(number_of_bytes, precision)

return str(number_of_bytes) + unit

'''
核心方法
'''
# 获取容量大小
def get_memory_dict(self):
json_data = {"data": []}
# 总内存
self.memory_dict_en['total'] = \
self.memory_dict_zh['总内存'] = self.bytes_2_human_readable(self.memory_result.total)

# 可用内存
self.memory_dict_en['available'] = \
self.memory_dict_zh['可用内存'] = self.bytes_2_human_readable(self.memory_result.available)

# 百分比
self.memory_dict_en['percent'] = \
self.memory_dict_zh['百分比'] = str(self.memory_result.percent) + "%"

# 已使用内存
self.memory_dict_en['used'] = \
self.memory_dict_zh['已用内存'] = self.bytes_2_human_readable(self.memory_result.used)

# 剩余内存
self.memory_dict_en['free'] = \
self.memory_dict_zh['剩余内存'] = self.bytes_2_human_readable(self.memory_result.free)

if self.language is None or self.language == "zh":
json_data['data'].append(dict(self.memory_dict_zh))
json_data.update(dict(msg="Success"))
return json.dumps(json_data, ensure_ascii=False)
elif self.language == "en":
json_data['data'].append(dict(self.memory_dict_en))
json_data.update(dict(msg="Success"))
return json.dumps(json_data, ensure_ascii=False)
else:
raise ValueError("Unsupported language!")

# 获取CPU信息
def get_cpu_info(self):
json_data = {"data": []}
# CPU核数
self.cpu_dict_en['CPU Core Count'] = \
self.cpu_dict_zh['CPU核数'] = str(psutil.cpu_count(logical=False))

# CPU线程数
self.cpu_dict_en['CPU Thread Count'] = \
self.cpu_dict_zh['CPU线程数'] = str(psutil.cpu_count())

if self.language is None or self.language == "zh":
json_data['data'].append(dict(self.cpu_dict_zh))
json_data.update(dict(msg="Success"))
return json.dumps(json_data, ensure_ascii=False)
elif self.language == "en":
json_data['data'].append(dict(self.cpu_dict_en))
json_data.update(dict(msg="Success"))
return json.dumps(json_data, ensure_ascii=False)
else:
raise ValueError("Unsupported language!")

# 获取本机用户时间和启动时间
def get_user_start_time(self):
json_data = {"data": []}
# 当前用户名
self.user_info_en['name'] = \
self.user_info_zh['用户名'] = psutil.users()[0].name

# 系统启动时间
self.user_info_en['system_start_time'] = \
self.user_info_zh['系统启动时间'] = self.change_timestamp(psutil.users()[0].started)

if self.language is None or self.language == "zh":
json_data['data'].append(dict(self.user_info_zh))
json_data.update(dict(msg="Success"))
return json.dumps(json_data, ensure_ascii=False)
elif self.language == "en":
json_data['data'].append(dict(self.user_info_en))
json_data.update(dict(msg="Success"))
return json.dumps(json_data, ensure_ascii=False)
else:
raise ValueError("Unsupported language!")

# 获取进程信息
def get_process_info(self):
# 创建一个json
json_data = {"data": []}
for pid in psutil.pids():
try:
# 进程号
self.process_info_en['PID'] = \
self.process_info_zh['进程号'] = pid

# 进程名
self.process_info_en['Name'] = \
self.process_info_zh['进程名'] = psutil.Process(pid).name()

# 进程状态
self.process_info_en['Status'] = \
self.process_info_zh['进程状态'] = psutil.Process(pid).status()

# 进程内存占用率
self.process_info_en['Percent'] = \
self.process_info_zh['进程内存使用率'] = str(round(psutil.Process(pid).memory_percent(), 3)) + "%"

# 进程内存占用大小
self.process_info_en['MemoryUsed'] = \
self.process_info_zh['进程内存占用'] = \
self.bytes_2_human_readable((psutil.Process(pid).memory_percent() / 100) * self.memory_result.total)

# 写入json数组
if self.language is None or self.language == "zh":
json_data['data'].append(dict(self.process_info_zh))
elif self.language == "en":
json_data['data'].append(dict(self.process_info_en))
else:
raise ValueError("Unsupported language!")
except psutil.NoSuchProcess:
continue
# 状态
json_data.update(dict(msg="Success"))

# 返回
return json.dumps(json_data, ensure_ascii=False)

# 获取全部信息
def get_all(self):
# 获取信息
memory_info = json.loads(self.get_memory_dict())
cpu_info = json.loads(self.get_cpu_info())
user_info = json.loads(self.get_user_start_time())
process_info = json.loads(self.get_process_info())

# 输出
print("---\t内存使用详情(Memory used info)\t---")
for memory_value in memory_info['data']:
print(memory_value)

print("\n")

print("---\tCPU详情(CPU used info)\t---")
for cpu_value in cpu_info['data']:
print(cpu_value)

print("\n")

print("---\t本机用户详情(Computer user info)\t---")
for user_value in user_info['data']:
print(user_value)

print("\n")

print("---\t进程详情(Process info)\t---")
for process in process_info['data']:
print(process)


# 帮助文档
def help_center():
print("Options and arguments (and corresponding environment variables):")
print("-l\t:this parameter need a language type, like zh or en.\n\t Chinese and English.")
print("-h\t:print this help message and exit (also --help)")


if __name__ == '__main__':
try:
# 获取参数(操作和语言类型)
options = sys.argv[1:]
# 如果参数为 -l
if options[0] == "-l":
# 如果参数
language = options[1::1]
if language[0] != "zh" and language[0] != "en":
raise ValueError("Language parameter is error!")
else:
m = SystemChecker(lang=language[0])
m.get_all()
# 如果参数为 -h
elif options[0] == "-h" or options[0] == "--help":
help_center()
# 误操作
else:
help_center()
print("\n")
raise ValueError("You give wrong option! You need read the doc.")
except IndexError:
print("You miss 1 argument!")
help_center()

文章作者: sunhailin-Leo
文章链接: http://www.leoyuki.xyz/2017/12/13/PythonPsutils/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 3.0 许可协议。转载请注明来自 LeoBlog