官方文档:https://docs.python.org/3/library/index.html
logging
基本的用法,日志信息打印在终端并且同时保存在文件中(运行程序的过程中文件内容会不断增加,不是运行完后一次性写入)
Copy import logging
logname = "xx.py"
filename = "x.log"
logger = logging . getLogger (logname) # logname为可选参数
fh = logging . FileHandler (filename, mode = "w" )
fh . setFormatter (logging. Formatter (fmt = " %(asctime)s %(filename): %(levelname): %(message)s " ))
fh . setLevel (logging.INFO)
logger . addHandler (fh)
ch = logging . StreamHandler (sys.stdout)
ch . setLevel (logging.INFO)
ch . setFormatter (fmt = " %(asctime)s : %(message)s " )
logger . addHandler (ch)
logger . setLevel (logging.INFO)
logger . info ( "xxx" )
控制输出内容,不同的日志文件写不同的内容
Copy import logging
def get_logger ( logger_name , log_file , level = logging . INFO):
logger = logging . getLogger (logger_name)
formatter = logging . Formatter ( ' %(asctime)s : %(message)s ' , "%Y-%m- %d %H:%M:%S" )
fileHandler = logging . FileHandler (log_file, mode = 'w' )
fileHandler . setFormatter (formatter)
logger . setLevel (level)
logger . addHandler (fileHandler)
return logger
log_file1 = '1.log'
log_file2 = '2.log'
# logger_name确保不相同,才会生成两个实例
log1 = get_logger ( 'log1' , log_file1)
log2 = get_logger ( 'log2' , log_file2)
log1 . error ( 'log1: error' )
log2 . info ( 'log2: info' )
备注:要产生不同的logger,要传递不同的logger_name,例如如下情况得到的两个logger会是一样的:
Copy import logging
import sys
import os
def get_logger ( filename ):
os . makedirs (os.path. dirname (os.path. abspath (filename)), exist_ok = True )
logger = logging . getLogger ()
fh = logging . FileHandler (filename, mode = "w" )
fh . setFormatter (logging. Formatter (fmt = " %(asctime)s : %(message)s " ))
fh . setLevel (logging.INFO)
logger . addHandler (fh)
ch = logging . StreamHandler (sys.stdout)
ch . setLevel (logging.INFO)
ch . setFormatter (logging. Formatter (fmt = " %(asctime)s : %(message)s " ))
logger . addHandler (ch)
logger . setLevel (logging.INFO)
return logger
logger = get_logger ( "log.txt" )
logger . info ( "x" )
logger2 = get_logger ( "log2.txt" )
logger2 . info ( "y" )
logger . info ( "z" )
print ( id (logger), id (logger2))
argparse
Copy import argparse
# 若不传某个参数一般情况下为None, 若default被指定, 则为default的值(nargs为"?"时为const的值)
parser = argparse . ArgumentParser ()
# --base 1 表示base=1,不传表示base=21
parser . add_argument ( "-b" , "--base" , type = int , default = 21 )
# --op1 表示op1=2,不传表示op1=None,--op1 20 表示op1=20
# 当nargs指定为"?"时, 默认值用const参数进行指定而非default参数
parser . add_argument ( "--op1" , type = int , nargs = "?" , const = 2 )
# nargs取值可以为整数/"?"/"*"/"+", 分别表示传入固定数量的参数,传入0/1个参数,传入0个或多个参数,传入1个或多个参数
# --a 表示a=True,不传表示a=False
parser . add_argument ( "--a" , action = "store_true" )
# 更一般的,可以自定义一个类继承argparse.Action类,然后将这个自定义类名传入action
# 以下表示--use-a与--use-b至多只能选择一个
group = parser . add_mutually_exclusive_group ()
group . add_argument ( "--use-a" , action = "store_true" , default = False )
group . add_argument ( "--use-b" , action = "store_true" , default = False )
args = parser . parse_args ()
备注:parse_args 函数存在 prefix-match的特性, 具体可参考官方文档 的如下例子:
Copy >>> parser = argparse.ArgumentParser(prog='PROG')
>>> parser.add_argument('-bacon')
>>> parser.add_argument('-badger')
>>> parser.parse_args('-bac MMM'.split())
Namespace(bacon='MMM', badger=None)
>>> parser.parse_args('-bad WOOD'.split())
Namespace(bacon=None, badger='WOOD')
>>> parser.parse_args('-ba BA'.split())
usage: PROG [-h] [-bacon BACON] [-badger BADGER]
PROG: error: ambiguous option: -ba could match -badger, -bacon
有些情况下,可以只解析一部分的命令行参数,而其余参数用其他逻辑进行处理,此时可以使用 parse_known_args
函数。备注:parse_known_args
函数也适用于 prefix-match 规则。
Copy # test.py
import argparse
parser = argparse . ArgumentParser ()
parser . add_argument ( '-c' , '--config' , type = str , required = True )
args , unknown_args = parser . parse_known_args ()
# python test.py -c a.py -d cpu
# args: Namespace(config='a.py'), unknown_args=['-d', 'cpu']
许多工具会包含许多子命令, 例如: git add
, git commit
, git push
, 其中: add/commit/push
都是子命令, argparse
包也能做到这一点:
Copy # xx.py
import argparse
parser = argparse . ArgumentParser ()
parser . add_argument ( '--foo' , action = 'store_true' , help = 'command help' )
subparsers = parser . add_subparsers (help = 'sub-command help' )
# create the parser for the "add" command
parser_a = subparsers . add_parser ( 'add' , help = 'add help' )
parser_a . add_argument ( 'bar' , type = int , help = 'python xx.py add <filename>' )
# create the parser for the "rm" command
parser_b = subparsers . add_parser ( 'rm' , help = 'rm help' )
parser_b . add_argument ( '--filename' , help = 'python xx.py rm --filename <filename>' )
# parse some argument lists
print (parser. parse_args ([ 'add' , '12' ]))
# Namespace(bar=12, foo=False)
print (parser. parse_args ([ '--foo' , 'rm' , '--filename' , 'Z' ]))
# Namespace(filename='Z', foo=True)
# 注意: --foo 必须出现在子命令之前
args = parser . parse_args ()
# Usage: python xx.py [-h] [--foo] {add,rm} ...
print (args)
os
Copy # 求相对路径
os . path . relpath ( "./data/a/b/c.txt" , "./data/a" ) # b/c.txt
os . path . splitext ( "a/b/c.txt" ) # ('a/b/c', '.txt')
os . path . expanduser ( "~/../a.txt" ) # /home/username/../a.txt
# abspath与realpath都不会处理~, realpath会返回软连接指向的位置, abspath只会返回软连接
os . path . abspath ( "~/a.txt" ) # /home/username/temp/~/a.txt
os . path . abspath (os.path. expanduser ( "~/a.txt" )) # /home/username/a.txt
os . path . realpath (os.path. expanduser ( "~/a.link" )) # /home/username/a.txt
# 最为标准的用法
os . path . abspath (os.path. realpath (os.path. expanduser ( "~/a.link" )))
io
open 函数的 mode 参数: r+b
表示可读可写(但只要调用了write, 原本的数据就会被覆盖掉)
read
peek
Copy # https://stackoverflow.com/questions/24474687/io-bufferedreader-peek-function-returning-all-the-text-in-the-buffer
# xyz.txt 的文件内容为: first\nsecond
stream = io . BufferedReader (io. FileIO ( 'xyz.txt' ), buffer_size = 4 )
stream . peek ( 8 ) # b'firs' # buffer中一共4个字节, 当前文件指针在初始位置0, peek 读取从文件指针到缓冲区结束的内容, 但不移动文件指针
stream . read ( 3 ) # b'fir' # buffer中一共4个字节, 当前文件指针在初始位置0, read 读取3个字节后把指针位置往后挪至3
stream . peek ( 8 ) # b's' # buffer中一共4个字节, 当前文件指针在3, peek 读取从文件指针到缓冲区结束的内容, 但不移动文件指针, 因此只读1个字节
stream . read ( 1 ) # b's' # buffer中一共4个字节, 当前文件指针在3, read 读取1个字节后把指针挪至4, 此时缓冲区被刷新为新的4个字节
stream . peek ( 8 ) # b't\nse' # buffer中一共4个字节, 当前文件指针在4, peek 读取从文件指针到缓冲区结束的内容, 但不移动文件指针, 因此读4个字节
# 总之, peek 方法可能获取的字节数多于或少于制定的参数, 取决于 buffer 大小以及当前文件指针的位置
stream . peek (num) [ : num] # 保证最多获取 num 个字节
seek
Copy stream . read ( 3 )
# 定义的常量: os.SEEK_SET: 0, os.SEEK_CUR: 1, os.SEEK_END: 2
stream . seek ( 2 , 1 ) # 返回 3+2=5
collections
defaultdict
递归定义 defaultdict
: 参考 (stackoverflow)[https://stackoverflow.com/questions/20428636/how-to-convert-defaultdict-to-dict]
Copy from collections import defaultdict
recurddict = lambda : defaultdict (recurddict)
data = recurddict ()
data [ "hello" ] = "world"
data [ "good" ] [ "day" ] = True
OrderedDict
Copy from collections import OrderedDict
d = OrderedDict ({ "a" : 1 , "b" : 2 , "c" : 3 , "d" : 4 })
d . move_to_end ( "b" )
print (d) # OrderedDict([('a', 1), ('c', 3), ('d', 4), ('b', 2)])
typing【涉及到generic看不懂】
typing 模块用于注解, 这些注解在运行时不起作用, 只适用于 mypy 工具做静态检查, 也方便读者阅读
看不懂的地方大概有这么几处:
typing.TypeVar
, typing.NewType
Tuple
Tuple[int, str]
:第一个元素为整数,第二个元素类型为字符串
Tuple[Any, ...]
:等价于 Tuple
Optional
Optional[Sized]
等同于 Union[Sized, None]
。
Sized
Sized
表示有一个具有 __len__
方法的对象,
Copy from typing import Optional , Sized
def foo ( a : Optional [ Sized ] ):
pass
Callable
Callable[[int], str]
:输入是 int 类型,输出是 str 类型的函数
Callable[..., str]
:输出是 str 类型的函数,对输入不加约束
TypeAlias
, NewType
, TypeVar
TypeAlias
Copy # 以下三种写法等价: TypeAlias
from typing import TypeAlias
# Vector 与 list[float] 等同
type Vector = list [ float ] # Python 3.12 引入的关键字 type
Vector = list [ float ]
Vector : TypeAlias = list [ float ] # 推荐写法, 兼容性好
TypeVar
比较常见, 官方文档 解释如下
The preferred way to construct a type variable is via the dedicated syntax for generic functions, generic classes, and generic type aliases
Copy from typing import TypeVar
T = TypeVar ( 'T' ) # Can be anything
S = TypeVar ( 'S' , bound = str ) # Can be any subtype of str
A = TypeVar ( 'A' , str , bytes ) # Must be exactly str or bytes
def foo ( arg : T) -> T:
return arg
NewType
Copy from typing import NewType
UserID = NewType ( "UserID" , int ) # Python 3.10 之前 UserID 是一个函数, 3.10 之后是一个类
在运行时, 基本上 x = NewType("UserID", int)(x)
, Python 3.8 中对 NewType
实现源码 如下:
Copy def NewType ( name , tp ):
def new_type ( x ):
return x
new_type . __name__ = name
new_type . __supertype__ = tp
return new_type
__name__
属性探索:
Copy from typing import TypeVar , NewType
T = TypeVar ( "T displayname" ) # 一般不会让 name="T displayname" 与变量名 T 不一致
T . __name__ # "T displayname"
UserID = NewType ( "interger displayname" , int )
UserID ( 123 )
UserID . __name__ # "interger displayname"
class A :
pass
A . __name__ # "A"
TypeVar
与 NewType
涉及的 name
参数是字符串类型, 影响的是 __name__
属性
Generic
类似这个写法 List[int]
, 这种方括号的写法看起来似乎是 C++ 里的泛型(模板)一样, 我们也可以自定义 (例子参考这篇博客 )
继承 Generic
, 就可以在类里面使用“模板类型”
Copy from typing import Dict , Generic , TypeVar
T = TypeVar ( "T" )
class Registry (Generic [ T ] ):
def __init__ ( self ) -> None :
self . _store : Dict [ str , T ] = {}
def set_item ( self , k : str , v : T) -> None :
self . _store [ k ] = v
def get_item ( self , k : str ) -> T:
return self . _store [ k ]
if __name__ == "__main__" :
family_name_reg = Registry [ str ]()
family_age_reg = Registry [ int ]()
family_name_reg . set_item ( "husband" , "steve" )
family_name_reg . set_item ( "dad" , "john" )
family_age_reg . set_item ( "steve" , 30 )
还存在一个疑问:
这个例子有些看不懂, 起源是 openai==1.1.1
python 包:
Copy from openai import Client , OpenAI
client = OpenAI (
# defaults to os.environ.get("OPENAI_API_KEY")
)
client . models . list (). __class__ . __mro__
结果是
Copy (openai.pagination.SyncPage[Model],
openai.pagination.SyncPage,
openai._base_client.BaseSyncPage,
openai._base_client.BasePage,
openai._models.GenericModel,
openai._compat.GenericModel,
openai._models.BaseModel,
pydantic.main.BaseModel,
typing.Generic,
object)
于是写了个测试例子
Copy import pydantic
from openai . _models import BaseModel
ModelT = TypeVar ( "ModelT" , bound = pydantic.BaseModel)
class Page ( BaseModel , Generic [ ModelT ] ):
page : int
p = Page [ pydantic . BaseModel ]. model_validate ({ "page" : 10 })
print (p. __class__ . __mro__ )
输出:
Copy (__main__.Page[BaseModel],
__main__.Page,
openai._models.BaseModel,
pydantic.main.BaseModel,
typing.Generic,
object)
overload
Copy from typing import overload
# 注意: 此处的...是语法
@overload
def foo ( name : str ) -> str :
...
@overload
def foo ( name : float ) -> str :
...
@overload
def foo ( name : int , age : int ) -> str :
...
def foo ( name , age = 18 ):
return "hello" + str (n)
Protocol
python 3.8 新特性, 相当于 Java 的 Interface
Copy from typing import Protocol
# 定义 Protocol
class MyProtocol ( Protocol ):
def method1 ( self , x : int ) -> str :
...
def method2 ( self , y : str ) -> None :
...
# 实现 Protocol 的类型
class MyClass :
def method1 ( self , x : int ) -> str :
return str (x)
def method2 ( self , y : str ) -> None :
print (y)
# 使用 Protocol 作为 type hint
def process ( obj : MyProtocol) -> None :
obj . method2 (obj. method1 ( 42 ))
typing.cast
参考博客
Copy x = 1
typing.cast(str, x) # 运行时依旧是整数1, 但mypy检查时认为它是字符串
IO
Copy # 一般用于带有 read, write 等方法的变量
from typing import IO , TextIO , BinaryIO
fr : TextIO = open ( "x.txt" , "rt" )
fr : BinaryIO = open ( "x.pdf" , "rt" )
fr : IO = open ( "x" ) # 不确定是 binary 还是 text 时可以用更一般的 typing.IO
enum
枚举类型
Copy from enum import Enum
class MyEnum ( Enum ):
A = "a"
B = "b"
# 得到一个枚举类型的实例有几种办法
x = MyEnum . A
y = MyEnum ( "a" )
z = MyEnum [ "A" ]
(x is y) and (y is z) # True
x == "a" # False
# 列举所有的取值
for key , value in MyEnum . __members__ . items ():
# key分别为["A", "B"]
# value分别为[MyEnum.A, MyEnum.B]
...
dataclasses (python>=3.7 才可用)
引用官方文档 的说明, 这个模块主要提供了一个针对类的装饰器 dataclass
,以自动生成一些特殊函数,例如:__init__
, __repr__
, __eq__
等(对应于 C 语言中的数据结构,即没有实例函数)
This module provides a decorator and functions for automatically adding generated special methods such as init () and repr () to user-defined classes
主要的接口为:
dataclasses.dataclass
的简单用法如下:
Copy # 备注: dataclass实际上有很多参数, 例如此处指定fronzen=True, 则初始化后不能再修改数据
@dataclass (frozen = True , eq = True )
class A :
x : int # 这里的类型注解是语法强制的, 但运行时不做类型检查
y : float
z : str
def foo ( self ):
return self . x * self . y
# A.__init__函数自动生成
a = A ( 1.0 , 2 , "x" ) # 注意: 此处实际上不会做类型检查
a . foo ()
a . x = 3 # 报错
备注:dataclass
还会自动生成 __eq__
等函数, 也可以设定 eq=False
抑制这一行为
field
的简单用法如下:
Copy @dataclass ()
class A :
# y: list[int] = list() # 会发生意料不到的情况
y : list [ int ] = field (default_factory = list )
a = A ()
a . y . append ( 2 )
b = A ()
b . y . append ( 3 )
A (). y # 此时会返回[], 但如果不用field函数直接写默认值为list(), 则此时返回为[2, 3]
更为深入的细节查阅官方文档:
如果 dataclass
装饰的类发生继承关系时, 自动生成的 __init__
函数的参数顺序一般来说是先父类, 再子类。但还有许多微妙之处
将 dataclass
中的一些 field
仅作为关键字参数如何处理
unicodedata
unicode 编码的目的是攘括所有的字符,然而它本身也有版本号, python 的每个版本所支持的 unicode 版本号也不相同, 例如:
unicode 的完整列表可以参考 wiki ,unicode 的字符范围为:0-0x10FFFF
,因此最多能容纳1114112个码位, 大多数字符的编码范围在 0-0xFFFF
(最多65536个)之间。一些例子如下:
U+0025
: %
, name 为 PERCENT SIGN
U+0B90
: ஐ
, name 为 TAMIL LETTER AI
而在编码界,需要区分编码方式 与实现方式 ,上面所讲的 Unicode 属于编码方式 的范畴,即规定了字符集。而从实现方式的角度,需要将每个字符映射为一个具体的二进制表示。一个自然的方式是将所有的字符按照 Unicode 的定义方式表示为 6 位 16 进制数,但实际上为了省空间,普遍采用的 Unicode 实现方式 为 utf-8
、utf-16
等,其中最通用的是 utf-8
,而 utf-8
具体的字符与字节的对应关系此处不再做展开。
这里简要举一些 unicodedata 的使用例子,更多复杂的内容请参考维基
Copy import unicodedata
c = "ஐ"
i = ord (c) # 字符 c 的 unicode 码位, 结果为: 2960 = 0x0B90 = 11*16*16+9*16
c . encode ( "utf-8" ) # 编码为 "utf-8" 时的实际字节表示: b'\xe0\xae\x90',可以看出utf-8实现中用了3个字节
print ( "\\u {i :>04x } " ) # \\u0b90
unicodedata . name (c) # 返回字符的名字: "TAMIL LETTER AI"
unicodedata . category (c) # 字符的类别
unicodedata . normalize ( 'NFC' , '\u0043\u0327' ) # 使用 NFC 的转换方式将 C 和一个类似逗号的符号合成为 1 个符号,见备注
unicodedata . is_normalized ( 'NFC' , '\u0043\u0327' ) # False,因为这两个字符可以合并为一个字符,见备注
unicodedata . unidata_version # unicode data 的版本:'13.0.0', 不同python版本返回值不一样
备注:
字符类别参见官网 ,例如:类别 "Zs" 表示 Space Seperator, 例如空格;但制表符的类别为 "Cc" Control;而中文字符以及很多其他字符被归为 Lo
Other Letter。
字符 normalize,有几种转换方式:NFC、NFD、NFKC、NFKD。例如有些字符有多种表示:U+00C7 (LATIN CAPITAL LETTER C WITH CEDILLA,形状为字母C
下面有个类似逗号的符号) 也可以被表示为 U+0043 (LATIN CAPITAL LETTER C,字母C
) U+0327 (COMBINING CEDILLA,类似于一个逗号的符号).
subprocess (待补充)
subprocess
模块的作用是运行命令行可以执行的命令
multiprocessing
模块的典型用法是用多进程执行同一个python 代码
subprocess.run
官方文档
Copy # 函数原型
subprocess . run (args, * , stdin = None , input = None , stdout = None , stderr = None , capture_output = False , shell = False , cwd = None , timeout = None , check = False , encoding = None , errors = None , text = None , env = None , universal_newlines = None , ** other_popen_kwargs)
Copy import subprocess
cmd = [ "dir" ]
subprocess . run (cmd) # 报错: FileNotFoundError: [WinError 2] 系统找不到指定的文件。
subprocess . run (cmd, shell = True ) #正常运行
其中shell
参数的默认值为,shell=True
表示"命令"(可能用词不准确)在shell中执行, 文档中说除非必要, 否则不要设置为True. 注意: 在window下, 上述情况需设置为True
, 主要原因是windows下echo
不是一个可执行文件, 而是cmd中的一个命令.
科普链接
一个在windows环境变量PATH目录下的可执行文件(以.exe
结尾), 可以通过win+R
组合键后敲入文件名进行执行; 而echo
在windows下不是一个自带的可执行文件, 而是cmd
窗口中的一个内置命令.
windows下cmd
是一个shell, 而平时所说的dos
是一种操作系统的名字, 而dos命令
是这个操作系统中的命令. cmd
窗口下的能执行的命令与dos
命令有许多重叠之处, 但不能混为一谈.
所谓shell
, 这是一个操作系统中的概念, 不同的操作系统有不同的shell
, 常见的有: windows下的cmd
(命令行shell), powershell(命令行shell), windows terminal(命令行shell), 文件资源管理器(图形化shell); linux下的bash(命令行shell, 全称: Bourne Again shell), shell是一种脚本语言.
subprocess.check_output
用于执行脚本得到输出结果
Copy import subprocess
output = subprocess . check_output ([ "echo" , "abc" ], shell = False )
output . decode ()
multiprocessing(待补充)
multiprocessing.Process
此为一个相对底层的 API,用于直接创建子进程运行 python 代码。
用法一:使用 multiprocessing.Process
创建进程并传入需要运行的 python 函数及实参
Copy # 注意观察注释的两行所起的效果
import time
import multiprocessing
def foo ( seconds = 2 ):
print ( f "sleep { seconds } s begin." )
time . sleep (seconds)
print ( f "sleep { seconds } s end." )
t1 = time . time ()
p1 = multiprocessing . Process (target = foo, args = ( 2 ,))
p2 = multiprocessing . Process (target = foo, args = ( 2 ,))
p1 . start ()
time . sleep ( 1 )
p2 . start ()
# p1.join() # 表示停下来等p1运行完
# p2.join() # 表示停下来等p2运行完
t2 = time . time ()
print ( "运行时间: " , t2 - t1)
方法二:自己写一个类继承 multiprocessing.Process
,示例如下:
Copy import multiprocessing
class MyProcess ( multiprocessing . Process ):
def __init__ ( self , func , args ):
super (). __init__ ()
self . func = func
self . args = args
def run ( self ):
self . func (self.args)
def foo ( x ):
return x
if __name__ == "__main__" :
process = MyProcess (foo, 1 )
process . start ()
process . join ()
Pool
Copy import multiprocessing
def func ( msg ):
print (multiprocessing. current_process ().name + '-' + msg)
pool = multiprocessing . Pool (processes = 4 ) # 创建4个进程
for i in range ( 10 ):
msg = "hello %d " % (i)
pool . apply_async (func, (msg, ))
pool . close () # 关闭进程池,表示不能在往进程池中添加进程
print (multiprocessing. current_process ().name)
pool . join () # 等待进程池中的所有进程执行完毕,必须在close()之后调用
print ( "Sub-process(es) done." )
Copy <<<输出结果>>>
MainProcess
ForkPoolWorker-1-hello 0
ForkPoolWorker-2-hello 1
ForkPoolWorker-1-hello 4
ForkPoolWorker-3-hello 2
ForkPoolWorker-2-hello 5
ForkPoolWorker-1-hello 6
ForkPoolWorker-4-hello 3
ForkPoolWorker-2-hello 7
ForkPoolWorker-1-hello 8
ForkPoolWorker-2-hello 9
Sub-process(es) done.
multiprocessing.Pool
主要有apply
, apply_async
, close
, imap
, imap_unordered
, join
, map
, map_async
, starmap
, starmap_async
, terminate
close, join, terminate
close
指关闭进程池, 即不再往里面添加新的任务
join
指等待进程池内的所有进程完成任务
terminate
指立刻中断所有进程的执行
map, map_async, starmap, starmap_async
首先, python中
Copy list ( map ( lambda x , y : x + y, [ 1 , 2 , 3 ], [ 1 , 2 , 3 ]))
# 输出结果: [2, 4, 6]
list (itertools. starmap ( lambda x , y : x + y, [( 1 , 1 ), ( 2 , 2 ), ( 3 , 3 )]))
# 输出结果: [2, 4, 6]
map
只接收单参数的函数, starmap
是接受多个参数的版本. 这四个函数实际上都调用了_map_async
, 具体参见源码, map
会阻塞主进程的执行, 但子进程是并行化的. 在python官方文档 中提到, 对于序列很长的时候, 可以使用imap
并指定chunksize
参数, 能极大提升效率
Copy # `map`与`map_async`的区别
def foo2 ( t ):
time . sleep (random. random () * 5 )
print ( f "got t[0]: { t[ 0 ] } , t[1]: { t[ 1 ] } , pid: { os. getpid () } , name: { multiprocessing. current_process ().name } " )
return t [ 0 ] + (t [ 1 ] - 5 )
def test_map ():
result = pool . map (foo2, ls)
# 主进程还要干别的事
time . sleep ( 7 )
print (result)
def test_map_async ():
result = pool . map_async (foo2, ls)
# 主进程还要干别的事
time . sleep ( 7 )
pool . close ()
pool . join ()
print (result. get ())
if __name__ == "__main__" :
start = time . time ()
pool = multiprocessing . Pool ( 3 )
# print(pool._pool)
ls = [(i , i) for i in range ( 10 ) ]
ls1 = list ( range ( 10 ))
ls2 = list ( range ( 10 ))
test_map () # 17s左右
# test_map_async() # 9秒左右
end = time . time ()
print (end - start)
map, apply, imap, imap_unodered
multi-args
Concurrence
Blocking
Ordered-results
concurrence指的是子进程之间能否并发执行, blocking指的是是否阻塞主进程的执行.
最常用的是map
, 非特殊情况其他不要尝试, 简单的示例如下, 但推荐使用with
语句
Copy from multiprocessing import Pool
def f ( x ):
return x * x
if __name__ == '__main__' :
with Pool ( 5 ) as p :
print (p. map (f, [ 1 , 2 , 3 ]))
Copy import time
import random
import multiprocessing
random . seed ( 1 )
# seconds = [random.random()*5 for i in range(10)]
# print(f"seconds: {[round(_, 2) for _ in seconds]}")
# print(f"total time: {sum(seconds): .2f}")
# seconds: [0.67, 4.24, 3.82, 1.28, 2.48, 2.25, 3.26, 3.94, 0.47, 0.14]
# total time: 22.54
def print_func ( f ):
def wrapper ( * args , ** kwargs ):
print (f. __name__ )
return f ( * args, ** kwargs)
return wrapper
def foo ( x , y ):
time . sleep (random. random () * 5 )
print ( f "got x: { x } , y: { y } , pid: { os. getpid () } , name: { multiprocessing. current_process ().name } " )
return x + (y - 5 )
def foo2 ( t ):
time . sleep (random. random () * 5 )
print ( f "got t[0]: { t[ 0 ] } , t[1]: { t[ 1 ] } , pid: { os. getpid () } , name: { multiprocessing. current_process ().name } " )
return t [ 0 ] + (t [ 1 ] - 5 )
def bar ( e ):
print ( "Some error happend!" )
print (e)
return 1000
@print_func
def test_map ():
result = pool . map (foo2, ls)
print (result)
@print_func
def test_map_async ():
result = pool . map_async (foo2, ls, callback = bar)
pool . close ()
pool . join ()
print (result. get ())
@print_func
def test_apply_async ():
result = []
for _ in ls :
result . append (pool. apply_async (foo, _))
pool . close ()
pool . join ()
result = [_ . get () for _ in result]
print (result)
@print_func
def test_apply ():
result = []
for _ in ls :
result . append (pool. apply (foo, _))
pool . close ()
pool . join ()
print (result)
if __name__ == "__main__" :
start = time . time ()
pool = multiprocessing . Pool ( 3 )
# print(pool._pool)
ls = [(i , i) for i in range ( 10 ) ]
ls1 = list ( range ( 10 ))
ls2 = list ( range ( 10 ))
# test_map()
test_map_async ()
# test_apply()
# test_apply_async()
end = time . time ()
print (end - start)
Queue、Pipe
其他一些与进程, cpu相关的测试代码:
Copy import time
import os
import multiprocessing
# 片段1
def foo ( seconds = 2 ):
print ( f "sleep { seconds } s begin." )
time . sleep (seconds)
print ( f "sleep { seconds } s end." )
t1 = time . time ()
p1 = multiprocessing . Process (target = foo, args = ( 2 ,))
p2 = multiprocessing . Process (target = foo, args = ( 2 ,))
p1 . start ()
time . sleep ( 1 )
p2 . start ()
p1 . join ()
p2 . join ()
t2 = time . time ()
print ( "运行时间: " , t2 - t1)
# 片段2
def func ( msg ):
print (multiprocessing. current_process ().name + '-' + msg)
pool = multiprocessing . Pool (processes = 4 ) # 创建4个进程
for i in range ( 6 ):
msg = "hello %d " % (i)
pool . apply_async (func, (msg, ))
pool . close () # 关闭进程池,表示不能在往进程池中添加进程
print (multiprocessing. current_process ().name)
pool . join () # 等待进程池中的所有进程执行完毕,必须在close()之后调用
print ( "Sub-process(es) done." )
# 片段3
print ( 'Process ( %s ) start...' % os. getpid ())
# Only works on Unix/Linux/Mac:
pid = os . fork ()
print (pid)
if pid == 0 :
print ( 'I am child process ( %s ) and my parent is %s .' % (os. getpid (), os. getppid ()))
time . sleep ( 3 )
else :
print ( 'I ( %s ) just created a child process ( %s ).' % (os. getpid (), pid))
time . sleep ( 3 )
# 片段4
from multiprocessing import Pool
import os , time , random
def long_time_task ( name ):
print ( 'Run task %s ( %s )...' % (name, os. getpid ()))
start = time . time ()
time . sleep (random. random () * 3 )
end = time . time ()
print ( 'Task %s runs %0.2f seconds.' % (name, (end - start)))
print ( 'Parent process %s .' % os. getpid ())
p = Pool ( None )
for i in range ( 9 ):
p . apply_async (long_time_task, args = (i,))
print ( 'Waiting for all subprocesses done...' )
p . close ()
p . join ()
print ( 'All subprocesses done.' )
# 片段5
import subprocess
print ( '$ nslookup www.python.org' )
r = subprocess . call ([ 'nslookup' , 'www.python.org' ])
print ( 'Exit code:' , r)
# 片段6
print ( '$ nslookup' )
p = subprocess . Popen ([ 'nslookup' ], stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
p1 = multiprocessing . Process (target = foo, args = ( 2 ,))
print ( type (p), type (p1))
output , err = p . communicate ( b 'set q=mx\npython.org\nexit\n' )
print (output. decode ( 'utf-8' ))
print ( 'Exit code:' , p.returncode)
# 片段7
from multiprocessing import Process , Queue
import os , time , random
# 写数据进程执行的代码:
def write ( q ):
print ( 'Process to write: %s ' % os. getpid ())
for value in [ 'A' , 'B' , 'C' ] :
print ( 'Put %s to queue...' % value)
q . put (value)
time . sleep (random. random ())
# 读数据进程执行的代码:
def read ( q ):
print ( 'Process to read: %s ' % os. getpid ())
while True :
value = q . get ( True )
print ( 'Get %s from queue.' % value)
if __name__ == '__main__' :
# 父进程创建Queue,并传给各个子进程:
q = Queue ()
pw = Process (target = write, args = (q,))
pr = Process (target = read, args = (q,))
# 启动子进程pw,写入:
pw . start ()
# 启动子进程pr,读取:
pr . start ()
# 等待pw结束:
pw . join ()
# pr进程里是死循环,无法等待其结束,只能强行终止:
pr . terminate ()
# 片段8
>>> import psutil
>>> psutil . pids () # 所有进程ID
[ 3865 , 3864 , 3863 , 3856 , 3855 , 3853 , 3776 , ... , 45 , 44 , 1 , 0 ]
>>> p = psutil . Process ( 3776 ) # 获取指定进程ID=3776,其实就是当前Python交互环境
>>> p . name () # 进程名称
'python3.6'
>>> p . exe () # 进程exe路径
'/Users/michael/anaconda3/bin/python3.6'
>>> p . cwd () # 进程工作目录
'/Users/michael'
>>> p . cmdline () # 进程启动的命令行
[ 'python3' ]
>>> p . ppid () # 父进程ID
3765
>>> p . parent () # 父进程
< psutil . Process (pid = 3765 , name = 'bash' ) at 4503144040 >
>>> p . children () # 子进程列表
[]
>>> p . status () # 进程状态
'running'
>>> p . username () # 进程用户名
'michael'
>>> p . create_time () # 进程创建时间
1511052731.120333
>>> p . terminal () # 进程终端
'/dev/ttys002'
>>> p . cpu_times () # 进程使用的CPU时间
pcputimes (user = 0.081150144 , system = 0.053269812 , children_user = 0.0 , children_system = 0.0 )
>>> p . memory_info () # 进程使用的内存
pmem (rss = 8310784 , vms = 2481725440 , pfaults = 3207 , pageins = 18 )
>>> p . open_files () # 进程打开的文件
[]
>>> p . connections () # 进程相关网络连接
[]
>>> p . num_threads () # 进程的线程数量
1
>>> p . threads () # 所有线程信息
[ pthread (id = 1 , user_time = 0.090318 , system_time = 0.062736 ) ]
>>> p . environ () # 进程环境变量
{ 'SHELL' : '/bin/bash' , 'PATH' : '/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin:...' , 'PWD' : '/Users/michael' , 'LANG' : 'zh_CN.UTF-8' , ... }
>>> p . terminate () # 结束进程
Terminated : 15 < -- 自己把自己结束了
concurrent.futures
concurrent.futures
的主要作用是创建异步的线程/进程池。主要的类为 concurrent.futures.Excutor
(异步调用), ThreadPoolExecutor
(异步线程池), ProcessPoolExecutor
(异步进程池)
concurrent.futures vs multiprocessing.Pool (待理解)
并发执行
python 中涉及关于并发执行的包及主要的类/方法有如下
threading
: threading.Thread
, threading.Pool
, threading.Queue
multiprocessing
: multiprocessing.Process
, multiprocessing.Pool
, multiprocessing.Queue
concurrent
: concurrent.futures.Excutor
, concurrent.futures.ThreadPoolExecutor
, concurrent.futures.ProcessPoolExecutor
subprocess
: subprocess.call
此处有多个疑问:
concurrent.futures.ProcessPoolExecutor
与 multiprocessing.Pool
之间的区别(待补充)
queue.Queue
与 multiprocessing.Queue
之间的区别(待补充)
例子1: 使用concurrent.futures
完成子任务异步调用
Copy from concurrent . futures import ThreadPoolExecutor , ProcessPoolExecutor
import random
import time
import threading
def foo ( uniq_id , data ):
secs = random . random () * 2
time . sleep (secs)
print ( f " { uniq_id } , foo sleep, { secs :.2f } " )
res = { "data_foo" : data [ "data" ] + 10 }
return uniq_id , res
def bar ( uniq_id , data ):
secs = random . random ()
time . sleep (secs)
print ( f " { uniq_id } , bar sleep, { secs :.2f } " )
res = { "data_bar" : data [ "data" ] + 100 }
return uniq_id , res
class Scheduler :
def __init__ ( self , names , funcs , pools ):
self . names = names
self . funcs = funcs
self . pools = pools
self . num_executors = len (funcs)
def do_task ( uniq_id , data ):
task_results = []
futures = []
time . sleep (random. random ())
for i in range (scheduler.num_executors):
time . sleep (random. random ())
futures . append (scheduler.pools[i]. submit (scheduler.funcs[i], uniq_id, data))
for i in range (scheduler.num_executors):
time . sleep (random. random ())
task_results . append (futures[i]. result ()[ 1 ])
result = dict ()
for task_result in task_results :
result . update (task_result)
print ( "处理完毕" , uniq_id, data, result)
return uniq_id , data , result
if __name__ == "__main__" :
scheduler = Scheduler (
names = [ "foo" , "bar" ],
funcs = [foo, bar],
pools = [ ProcessPoolExecutor ( 2 ), ProcessPoolExecutor ( 2 )]
)
# 模拟并发请求
n = 4
threads = []
for i in range (n):
t = threading . Thread (target = do_task, args = (i, { "data" : i}))
t . setDaemon ( True )
threads . append (t)
for t in threads :
t . start ()
for t in threads :
t . join ()
例子2: 多个进程修改变量
Copy from concurrent . futures import ProcessPoolExecutor
from multiprocessing import Manager
import time
import os
def foo ( x , data ):
data . append (x)
print (x, "Done" , id (data), data, os. getpid ())
class A :
def __init__ ( self ):
self . data = Manager (). list () # 如果是普通的列表, 则self.data将不会被修改
self . executor = ProcessPoolExecutor ( 1 )
def process ( self , i ):
self . executor . submit (foo, i, self.data) # submit 返回 future 对象
if __name__ == "__main__" :
print (os. getpid ())
a = A ()
a . process ( 1 )
a . process ( 2 )
time . sleep ( 0.2 ) # 这里不严谨, 应该用 future 对象, 确保执行完毕
print (a.data)
re、glob
此部分微妙处比较多, 许多符号例如?
有着多种含义, 需仔细校对
pattern的写法
参考
示例里pattern的首位两个正斜杠不是正则表达式的一部分, 示例中所谓的匹配实际上对应的是re.search
方法, 意思是存在子串能符合所定义的模式, 以下是python
中re
模块在单行贪婪模式, 更细节的内容参见后面.
/^abc/
可以匹配abcd
, 但不能匹配dabc
/abd$/
可以匹配cabc
, 但不能匹配abcc
大多数特殊字符^
, .
, *
, (
, {
均无需使用反斜杠转义, 但存在例外(其实还不少), 例如: [
与\
需要用反斜杠转义
/[.\["]/
表示匹配如下三个字符: .
, [
, "
备注:
正则表达式中特有的符号.*+[]^$()|{}?
, 如果需要表达它们自身, 一般需要转义. 另外, 有以下几个常用的转义字符:
匹配数字/下划线/字母(沿用计算机语言中_word_的概念)
贪婪匹配与非贪婪匹配
*
, +
, ?
, {m, n}
, {m,}
, {,n}
均为贪婪模式, 表示尽量匹配更长的字符串, 例如/t*/
匹配ttT
中的tt
. 相反地, 在上述符号后添加?
后变为非贪婪模式, 表示匹配尽量短的匹配字符串. 有一个特别的例子如下:
Copy # 可以认为原字符串被填补为了 '\0t\0T\0a\0' ?
re . findall ( "[tT]*?" , "tTa" )
# 输出: ['', 't', '', 'T', '', '']
[...]
形式的pattern存在较多特殊情况
/[tT]*/
可以匹配tTt
, 即不需要是重复地若干个t
或者若干个T
. 类似地/[tT]{2}/
也可以匹配tT
.
如果需要匹配\
, 则需要使用反斜杠将其转义, 例如: /[\\a]/
表示匹配\
或者a
.
/[a-z]/
这种表示是前闭后闭区间(首尾的两个字符都含在匹配范围内), 并且必须按照Unicode的顺序前小后大(见后文).
[
与]
不需要进行转义, 如果需要匹配]
那么]
必须放在开头, 例如/[]a]/
表示匹配]
或者a
. 但当采用非的语义时, ]
应该放在^
后面的第一个位置, 例如/[^]a]/
表示不匹配]
及a
.
如果需要匹配-
, 可以将-
放在开头或结尾, 或者是用\-
进行转义.
-
与]
混合的情形, 例子: /[-]]/
表示匹配-]
, /[]-]/
表示匹配]
或者-
.
^
字符无需进行转义, 如果需要匹配^
, 那么要避免将^
放在第一个位置即可. 例如: /[1^]/
表示匹配1
或者^
.
[]
, {}
, ()
的嵌套问题, 根据目前所知 , 一般不能多层嵌套, 具体地, ()
内[]
和{}
可以正常使用, 这是因为圆括号只是标记一个范围, 而[]
与{}
内部不能嵌套三种括号. 例如: /([tT]he)/
及/([tT]{2})/
是合法的pattern.
/\1/
这种以反斜杠开头加上数字的表示法表示匹配与上一个圆括号块完全相同的字符串, 标号从1开始, (?:)
这种写法用于取消当前块的标号
(?=)
, (?!)
被称为Lookahead Assertions, 表示匹配字符但不消耗, 例子/[A-Za-z]{2}(?!c)/
可以匹配bag
, 最终获取到ba
Copy re . findall ( "[A-Za-z] {2} (?!c)" , "bag" ) # ["ba"]
(?<=)
, (?<!)
与上两者类似, 但匹配的是前面的字符
常用函数
更多说明
贪婪/非贪婪, 单行/多行匹配模式
正则表达式也可以用于字节流的匹配
一些骚操作
Copy import re
inputStr = "hello crifan, nihao crifan" ;
replacedStr = re . sub ( r " ( hello )(\w + )( , nihao )( \2 ) " , r "\1crifanli\3\4" , inputStr)
replacedStr
python raw string : python
中的raw string
不能以奇数个反斜杠作为结尾, 例如x=r'\'
这种语句解释器会直接报错, 但x=r'\\'
会被解释器正确地解释为两个反斜杠. 其原因可以参见这个问答 , 简述如下: 在python中对raw string
的解释是碰到\
就将\
与下一个字符解释为本来的含义, 也就是说raw string
出现了反斜杠, 后面必须跟着一个字符, 所以r'\'
中的第二个单引号会被认为是一个字符, 但这样一来就没有表示字符串结尾的单引号了.
Copy # 测试
x = "a # SyntaxError: EOL while scanning string literal
x = r "\" # SyntaxError: EOL while scanning string literal
# 如果希望使用raw string但结尾有单个反斜杠的解决方法
x = r "a\cc" + "\\" # 表示a\cc\
Unicode与UTF-8的关系 : 参考阮一峰博客 , 简单来说, Unicode是一个字符代码 , 用整数(目前至多为21bit)来表示所有的字符对应关系, 而UTF-8是一种具体的字符编码 . 前者的重点在于将全世界的字符都有一个唯一的对应关系, 全球公认, 而后者的重点在于在保证能编码所有Unicode中规定地字符集的前提下, 利用更巧妙地方式对这种对应关系进行存储, 方便传输.
样例
Copy "(.)\1{2}" # 用于匹配一个字符3次, 例如:AAA, 注意不能使用(.){3}或.{3}
python re模块的实现
使用一个东西, 却不明白它的道理, 不高明
linux下的通配符(glob)
参考资料: 阮一峰博客
通配符又叫做 globbing patterns。因为 Unix 早期有一个/etc/glob
文件保存通配符模板,后来 Bash 内置了这个功能,但是这个名字被保留了下来。通配符早于正则表达式出现,可以看作是原始的正则表达式。它的功能没有正则那么强大灵活,但是胜在简单和方便。
?
表示单个字符, *
代表任意数量个字符, [abc]
表示方括号内任意一个字符, [a-z]
表示一个连续的范围, [^abc]
或[!abc]
或[^a-c]
或[!a-c]
表示排除方括号内的字符
{abc,def}
表示多字符版本的方括号, 匹配任意abc
或def
, 中间用逗号隔开, 大括号可以嵌套, 例如{j{p,pe}g}
表示jpg
或jpeg
.
{11..13}
表示11
或12
或13
, 但如果无法解释时, 例如: {1a..1c}
则模式会原样保留.
注意点: *
不能匹配/
, 所以经常会出现a/*.pdf
这种写法
ctypes
ctypes的代码运行效率不如cython
1. C/C++程序的编译
2. 使用ctypes
参考博客: Python - using C and C++ libraries with ctypes | Solarian Programmer
以下仅为tutorial(可能理解不准确)
2.1 调用动态链接库
以一个例子加以说明: 来源
Copy // adder.c, 将其编译为adder.so
// gcc -shared -o adder.so -fPIC adder.c
int add_int ( int num1 , int num2){ return num1 + num2;}
float add_float ( float num1 , float num2){ return num1 + num2;}
Copy import ctypes
adder = ctypes . CDLL ( './adder.so' ) # load the shared object file
res_int = adder . add_int ( 4 , 5 ) # Find sum of integers
print ( "Sum of 4 and 5 = " + str (res_int))
# 注意, 以下用法能确保不出错
p_a , p_b = 5.5 , 4.1 # Find sum of floats
c_a , c_b = ctypes . c_float (p_a), ctypes . c_float (p_b)
add_float = adder . add_float
add_float . restype = ctypes . c_float
res = add_float (c_a, c_b)
print ( "Sum of 5.5 and 4.1 = " , str (res))
解释: 由于python与c两种语言在数据结构上有着明显的不同, 因此利用ctypes调用C代码时需要进行相应的类型转换: 以上述的add_float
为例, python中浮点数都是双精度的(不妨记为Python-double
). 而adder.c
函数参数都是单精度的, 不妨记为C-float
, 可以将调用过程细化为几步
python数据类型转换为c数据类型: p_a
-> c_a
, p_b
-> c_b
python将c代码返回的结果转换为python类型
为了使用add_float(1.0, 2.0)
这种形式进行调用, 必须将1.0
转换为适应c
的数据形式(add_float.argtypes
), 对于返回值, 同样道理, 也应指定返回时c->python
的转换(add_float.restype
)
Copy add_float = adder . add_float
add_float . argtypes = [ctypes . c_float , ctypes . c_float]
add_float . restype = ctypes . c_float
print ( add_float ( 1.0 , 2.0 )) # ok
print ( add_float (ctypes. c_float ( 1.0 ), ctypes. c_float ( 2.0 ))) # ok
Copy add_float = adder . add_float
# add_float.argtypes = [ctypes.c_float, ctypes.c_float]
add_float . restype = ctypes . c_float
print ( add_float ( 1.0 , 2.0 )) # error
print ( add_float (ctypes. c_float ( 1.0 ), ctypes. c_float ( 2.0 ))) # ok
Copy add_float = adder . add_float
add_float . argtypes = [ctypes . c_float , ctypes . c_float]
# add_float.restype = ctypes.c_float
print ( add_float ( 1.0 , 2.0 )) # error
print ( add_float (ctypes. c_float ( 1.0 ), ctypes. c_float ( 2.0 ))) # error
经过一番探索后发现, 最好还是指定argtypes
与restype
, 前者也许可以不指定, 后者必须指定, 并且不要试图理解不指定restype
时的结果, 大概是implement dependent
的东西, 似乎也不能直接用IEEE 754
浮点数表示法进行解释. 大约是: 不指定restype
时默认是c_int
, 深入到底层时, 应该是C
端传回了一些字节, Python
端将其解读为int
, 但不能用同样的比特流解读为浮点型? (不要深究: ctypes用了cpython安装时的一些东西, 注意: cpython是python的一种实现, 也是最普遍的实现, 与cython是不同的东西)
argtypes与restype的问题参见: 链接1 , 链接2 .
2.2 类似于cython的方式
ctypes也支持直接用python代码写C代码? 但效率不如cython
Copy import ctypes
import ctypes . util
from ctypes import c_int , POINTER , CFUNCTYPE , sizeof
# 如果在linux上, 则将"msvcrt"改为"c"即可
path_libc = ctypes . util . find_library ( "msvcrt" )
libc = ctypes . CDLL (path_libc)
IntArray5 = c_int * 5
ia = IntArray5 ( 5 , 1 , 7 , 33 , 99 )
qsort = libc . qsort
qsort . restype = None
CMPFUNC = CFUNCTYPE (c_int, POINTER (c_int), POINTER (c_int))
def py_cmp_func ( a , b ):
print ( "py_cmp_func" , a[ 0 ], b[ 0 ])
return a [ 0 ] - b [ 0 ]
cmp_func = CMPFUNC (py_cmp_func)
qsort (ia, len (ia), sizeof (c_int), cmp_func)
for i in ia :
print (i, end = " " )
3. ctypes官方文档的学习记录
3.1 python类型与c类型的转换
None, int, bytes, (unicode) strings是python中能直接作为C函数参数的数据类型, 其中None代表C中的空指针, bytes与strings作为char *
与wchar_t *
的指针, int作为C中的int
类型, 但注意过大的数字传入C时会被截断. 也就是说上面的restype
与argtypes
可以不指定仅限于上述几种数据类型.
None
, integers, bytes objects and (unicode) strings are the only native Python objects that can directly be used as parameters in these function calls. None
is passed as a C NULL
pointer, bytes objects and strings are passed as pointer to the memory block that contains their data (char *
or wchar_t *
). Python integers are passed as the platforms default C int
type, their value is masked to fit into the C type.
Copy from ctypes import *
# c_int, c_float是可变的, 这种类型可以改变value
i = c_int ( 42 ) # i: c_long(42)
i . value = - 99 # i: c_long(-99)
Copy # 注意python中的int类型对应于c_void_p
# c_char_p, c_wchar_p, c_void_p这几种类型改变value实际上改变的是其指向的地址
s = "Hello World"
c_s = c_wchar_p (s) # c_s: c_wchar_p(139966222), c_s.value: "Hello World"
c_s . value = "Here" # c_s: c_wchar_p(222222222), c_s.value: "Here"
print (s) # "Hello World"
Copy # create_string_buffer=c_buffer=c_string
# create_unicode_buffer
# 如果需要可变的内存块, 则要使用ctypes.create_string_buffer函数, 此函数有多种调用方式
# 如果需要修改, 则对raw属性或者value属性进行修改即可
p = create_string_buffer ( 3 ) # 开辟3字节空间, 并将值初始化为0
print ( sizeof (p), repr (p.raw)) # 3, b'\x00\x00\x00'
p = create_string_buffer ( b "Hello" ) # 开辟6字节空间
# value用于获得去除空字符后的字节流
print ( sizeof (p), repr (p.raw), p.value) # 6, b'Hello\x00', b'Hello'
p = create_string_buffer ( b "Hello" , 10 )
print ( sizeof (p), repr (p.raw)) # 10, b'Hello\x00\x00\x00\x00\x00'
Copy >>> printf = libc.printf
>>> printf(b"Hello, %s\n", b"World!")
Hello, World!
14
>>> printf(b"Hello, %S\n", "World!")
Hello, World!
14
>>> printf(b"%d bottles of beer\n", 42)
42 bottles of beer
19
>>> printf(b"%f bottles of beer\n", 42.5)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
ArgumentError: argument 2: exceptions.TypeError: Don't know how to convert parameter 2
>>> printf(b"An int %d, a double %f\n", 1234, c_double(3.14)) # 似乎不能用c_float
An int 1234, a double 3.140000
31
>>> # 可能C语言中%f只是用来输出双精度浮点数的?
inspect
inspect.signature
返回函数的特征标(即原型或者说是参数名列表)
inspect.stack
用于返回当前的函数调用栈
inspect.isclass(obj)
用于判断 obj 是否为一个类
输出一个实例的完整类名
Copy import numpy as np
arr = np.array ([1, 2])
cls = arr.__class__ # <class numpy.ndarray>
module: str = cls.__module__
name: str = cls.__qualname__ # 优于__name__
备注:__name__
vs __qualname__
: stackoverflow
inspect.unwrap
用于解除用 functools.wraps
修饰的装饰器
Copy import inspect
from functools import wraps
def test_decorator ( func ):
@wraps (func)
def wrapper ( * args , ** kwargs ):
print ( 'test_decorator' )
return func ( * args, ** kwargs)
return wrapper
@test_decorator
def spam ():
print ( 'spam1' , '\n' )
spam ()
print ()
spam = inspect . unwrap (spam)
spam ()
weakref
Copy import weakref
class Model :
def __init__ ( self ):
self . graph = Graph (weakref. proxy (self))
# self.graph = Graph(self)
class Graph :
def __init__ ( self , model ):
self . model_ref = model
# 创建 Model 实例
model = Model ()
graph = model . graph
# 销毁对 Model 实例的强引用
del model
# 尝试访问 Graph 实例中的弱引用
try :
print (graph.model_ref)
except Exception as e :
print ( "Exception:" , e) # 进入此处!
json
Copy import json
dict_json = { 'accountID' : '123' , 'notes' : { 'sentences color' : { 1 : 50 , 0 : 50 }, 'human color' : 2 , 'status' : '中立' }, 'result' : '' }
str_json = json . dumps (dict_json)
print (str_json)
print (json. loads (str_json))
with open ( "json_format.txt" , "w" ) as fw :
json . dump (dict_json, fw)
with open ( "json_format.txt" , "r" ) as fr :
new_dict = json . load (fr)
print (new_dict)
Copy json . dumps ( dict ) # dict->str
json . dump ( dict , fw) # dict->write file
json . loads ( str ) # str->dict
json . load (fr) # read file->dict
asyncio
概念澄清:
Asynchronous IO (async IO) : 是一个语言无关的模型
async/await : 是 Python 的关键字
coroutine : 在 Python 中是一类特殊的 generator 函数
不是太确定: 在讨论协程时, 普通函数也被称为阻塞型函数 (blocking function), 而使用 async def
定义的函数被称为 coroutine, 也被称为非阻塞型函数 (nonblocking function)
使用示例 (八股文?)
Copy import asyncio
from contextvars import copy_context
from functools import partial
async def afn ( x , y ):
return x - y
async def afm ( x , y ):
return await asyncio . get_running_loop (). run_in_executor (
None ,
partial ( copy_context ().run, lambda x , y : x * y, x, y)
)
async def run_in_executor (
executor_or_config ,
func , # func 不是 async 修饰的函数, 即普通函数(或者被称为阻塞型函数或blocking function)
* args ,
** kwargs ,
):
if executor_or_config is None or isinstance (executor_or_config, dict ):
return await asyncio . get_running_loop (). run_in_executor (
None ,
partial ( copy_context ().run, func, * args, ** kwargs),
)
return await asyncio . get_running_loop (). run_in_executor (
executor_or_config, partial (func, ** kwargs), * args
)
def foo ( x , y ):
return x + y
async def afoo ( x , y ):
return await run_in_executor (
None , foo, x, y
)
if __name__ == "__main__" :
res = asyncio . run ( afn ( 1 , 2 ))
print (res)
res = asyncio . run ( afm ( 1 , 2 ))
print (res)
res = asyncio . run ( afoo ( 1 , 2 ))
print (res)
asyncio.run(afn())
vs asyncio.get_event_loop().run_until_complete(afn())
两者基本等价? (不确定)
Copy import asyncio
async def main ():
# 可以使用 await, async for 等语法
...
asyncio . run ( main ())
# asyncio.get_event_loop().run_until_complete(main())
源码如下:
Copy # asyncio/runners.py
from . import events
def run ( main ):
if events . _get_running_loop () is not None :
raise RuntimeError ( "asyncio.run() cannot be called from a running event loop" )
loop = get_event_loop_policy (). new_event_loop () # events.new_event_loop()
get_event_loop_policy (). set_event_loop (loop) # events.set_event_loop(loop)
return loop . run_until_complete (main)
# asyncio/events.py
def get_event_loop ():
current_loop = _get_running_loop ()
if current_loop is not None :
return current_loop
return get_event_loop_policy (). get_event_loop ()
疑问:
asyncio.get_event_loop()
vs asyncio.get_running_loop()
: 前者通常是在非协程环境中使用, 例如在模块级别. 后者通常是在 async def
的函数体内被使用
run_in_executor()
vs run_until_complete()
: 前者接收的是普通函数, 后者接收的是 async def
的函数
return await
是在做什么? 仅仅是省一行代码
Copy async def example_async_function ():
result = await some_async_operation ()
return result
# 等价于
# return await some_async_operation()
select/selector
select 似乎是对系统调用的直接暴露, 以下代码段暂时不知道在做啥 https://chatgpt.com/share/2ee353c0-8043-4dd5-a8f4-a7c9504310fd
常见的 epoll 事件类型有:
select.EPOLLIN:对应的值是1,表示文件描述符可读,即有数据可以读取(例如客户端发送了数据,或者有新连接请求)。 select.EPOLLOUT:对应的值是4,表示文件描述符可写,即可以向其发送数据而不会阻塞。 select.EPOLLERR:对应的值是8,表示文件描述符上发生了错误。 select.EPOLLHUP:对应的值是16,表示文件描述符被挂起或连接关闭。 select.EPOLLET:对应的值是2的31次方,用于启用边沿触发模式(默认是水平触发)。
服务端
Copy import select
import socket
# 创建一个 TCP/IP socket
server_socket = socket . socket (socket.AF_INET, socket.SOCK_STREAM)
server_socket . bind (( 'localhost' , 8080 ))
server_socket . listen ( 5 )
server_socket . setblocking ( False )
# 创建一个 epoll 对象
epoll = select . epoll ()
# 注册服务器 socket 到 epoll,监听其可读事件
epoll . register (server_socket. fileno (), select.EPOLLIN)
try :
connections = {}
while True :
# 等待事件发生
events = epoll . poll ( 1 ) # 1秒超时
for fileno , event in events :
print (fileno, event)
if fileno == server_socket . fileno ():
# 有新的连接
connection , address = server_socket . accept ()
connection . setblocking ( False )
print ( f "Connected by { address } " )
# 注册新连接的 socket 到 epoll
epoll . register (connection. fileno (), select.EPOLLIN)
connections [ connection . fileno ()] = connection
elif event & select . EPOLLIN :
# socket 可读
data = connections [ fileno ]. recv ( 1024 )
if data :
print ( f "Received: { data. decode () } from { connections[fileno]. getpeername () } " )
# 响应客户端
connections [ fileno ]. send (data)
else :
# 关闭连接
epoll . unregister (fileno)
connections [ fileno ]. close ()
del connections [ fileno ]
finally :
epoll . unregister (server_socket. fileno ())
epoll . close ()
server_socket . close ()
客户端
Copy import socket
# 创建一个 TCP/IP socket
client_socket = socket . socket (socket.AF_INET, socket.SOCK_STREAM)
# 连接到服务器
server_address = ( 'localhost' , 8080 )
print ( f "Connecting to { server_address[ 0 ] } port { server_address[ 1 ] } " )
client_socket . connect (server_address)
try :
# 发送数据
message = 'This is the message. It will be echoed back by the server.'
print ( f "Sending: { message } " )
client_socket . sendall (message. encode ())
# 接收响应
data = client_socket . recv ( 1024 )
print ( f "Received: { data. decode () } " )
finally :
print ( "Closing connection" )
client_socket . close ()