隐水印(watermark)使用文档

目录结构

项目目录

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
├── attr_test.py                //  对图片攻击测试py文件
├── blind_watermark // 隐水印库文件夹
│ ├── att.py // 图片攻击测试
│ ├── blind_watermark.py // WaterMark类
│ ├── bwm_core.py // WaterMarkCore类
│ ├── cli_tools.py // 命令行运行方式及参数说明
│ ├── pool.py // 图片池化
│ ├── recover.py // 工具类
│ ├── version.py // 版本说明
│ ├── __init__.py // 初始化文件
├── image_watermark.py // 图片嵌入水印测试py文件
├── input.txt // 输入语料
├── input_gbk.txt // 输入语料(GBK编码)
├── input_img // 待加密图片文件夹
│ ├── new_test.jpg // 嵌入过程中对原图修改后的图片文件
│ └── test.jpg // 待加密原始图片
├── output_img // 嵌入水印后生成图片的文件夹
│ └── output1.jpg // 已嵌入水印图片
├── requirements.txt // 项目依赖
├── watermark_attr.py // 图片攻击类

类及函数说明

图片嵌入水印(image_watermark.py):

在image_watermark.py中,定义函数encryption()、extract_watermark_from_image()。

encryption()函数为嵌入水印功能的实现。首先从本地目录读取input.txt水印文本,保存到long_text变量当中,之后读取原始图片test.jpg,使用opencv读取图片,获取到图片的参数,然后分别计算原始图片最多可容纳水印信息的容量与long_text转换为字节数的大小,之后将图片信息容量与文本字节容量进行比较,如果文本字节容量超过到图片信息容量,那么就将原始图片进行像素扩大以保证能够完整的嵌入全部的信息。最后调用WaterMark类进行水印嵌入操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import math
import zlib
import cv2
from blind_watermark import WaterMark
import os

def encryption(ori_img_name,watermark_filename):
"""
嵌入水印到图片
:return: 水印的长度(字节数)
"""
# 读取长文本
with open(watermark_filename, 'r', encoding='gbk') as file:
long_text = file.read()

# 读取原始图片
original_image = cv2.imread("input_img/"+ori_img_name)
img_shape = original_image.shape[:2] # 图片尺寸

# 计算图片中的水印容量
img_ori_byte = (img_shape[0] * img_shape[1]) // 64
byte = bin(int(zlib.compress(long_text.encode('GBK')).hex(), base=16))[2:]

# 如果水印容量大于图片容量,则调整图片大小以容纳水印
if img_ori_byte < len(byte):
mult = round(math.sqrt((len(byte) / img_ori_byte) * 2), 2)
target_size = (round(img_shape[1] * mult), round(img_shape[0] * mult))
img_resized = cv2.resize(original_image, target_size)
cv2.imwrite("input_img/new_"+ori_img_name, img_resized)

# 在调整后的图片中嵌入水印
watermark_embedder = WaterMark(password_wm=1, password_img=1)
watermark_embedder.read_img("input_img/new_"+ori_img_name)
watermark_embedder.read_wm(long_text, mode='str')
watermark_embedder.embed("output_img/output_"+ori_img_name)
return len(byte)

watermark_embedder = WaterMark(password_wm=1, password_img=1)
watermark_embedder.read_img("input_img/" + ori_img_name)
watermark_embedder.read_wm(long_text, mode='str')
watermark_embedder.embed("output_img/output_" + ori_img_name)
return len(byte)


def extract_watermark_from_image(embedded_image_path, wm_shape):
"""
从已嵌入的图片读取水印信息
:param embedded_image_path: 嵌入水印的图片路径
:param wm_shape: 水印的长度(字节数)
:return: 提取的水印信息
"""
watermark_extractor = WaterMark(password_wm=1, password_img=1)
extracted_watermark = watermark_extractor.extract(embedded_image_path, wm_shape=wm_shape, mode="str")
print(extracted_watermark)
return extracted_watermark


def print_directory_tree(path, indent="", excluded_folders=None):
"""
输出项目目录结构函数
:param path:
:param indent:
:param excluded_folders:
:return:
"""
if excluded_folders is None:
excluded_folders = []

items = os.listdir(path)
for i, item in enumerate(items):
item_path = os.path.join(path, item)
is_last = i == len(items) - 1

if os.path.isdir(item_path):
if item not in excluded_folders:
print(indent + ("└── " if is_last else "├── ") + item)
new_indent = indent + (" " if is_last else "│ ")
print_directory_tree(item_path, new_indent, excluded_folders)
else:
print(indent + ("└── " if is_last else "├── ") + item)


if __name__ == '__main__':
warter_mark_filename = "input_gbk.txt"
input_filename = "test.jpg"
wm_shape = encryption("test.jpg",warter_mark_filename)
watermark = extract_watermark_from_image("output_img/output_"+input_filename, wm_shape)
# watermark = extract_watermark_from_image('input_img/test2.jpg', wm_shape)


#--------- 输出项目目录 树形结构 -----------------
# starting_path = "./"
# exclude_folders = ["venv",".idea","__pycache__"] # 要排除的文件夹名称列表
# print_directory_tree(starting_path, excluded_folders=exclude_folders)

攻击测试类(watermark_attr.py):

在watermark_attr.py中,定义了GanWatermark类,默认情况下嵌入的水印为“深圳杯数学建模挑战赛”其中功能包含:不攻击情况下提取水印、压缩攻击并提取水印、变换格式攻击并提取水印、缩放攻击并提取水印、旋转攻击并提取水印、椒盐攻击并提取水印、遮挡攻击并提取水印、亮度攻击并提取水印。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# -*- coding: utf-8 -*-
import cv2
import numpy as np
from blind_watermark import WaterMark, att


class GanWatermark:
def __init__(self):
"""
初始化GanWatermark类
"""
self.ori_img = cv2.imread('input_img/test.jpg', flags=cv2.IMREAD_UNCHANGED)
self.wm = '深圳杯数学建模挑战赛'
self.ori_img_shape = self.ori_img.shape[:2]
self.h, self.w = self.ori_img_shape
self.bwm = WaterMark(password_img=1, password_wm=1)
self.bwm.read_img(img=self.ori_img)
self.bwm.read_wm(self.wm, mode='str')
self.embed_img = self.bwm.embed("output_img/attr_test.jpg",)
self.len_wm = len(self.bwm.wm_bit)
print('水印长度为: {len_wm}'.format(len_wm=self.len_wm))

def unGan(self):
"""
不攻击的情况下提取水印
"""
un_gan_wm_extract = self.bwm.extract('output_img/attr_test.jpg', wm_shape=self.len_wm, mode='str')
print("不攻击的提取结果:", un_gan_wm_extract)

def compressGan(self):
"""
压缩攻击并提取水印
"""
success, compressed_img = cv2.imencode('.jpg', self.embed_img, [cv2.IMWRITE_JPEG_QUALITY, 50])
with open('output_img/compressed.jpg', 'wb') as f:
f.write(compressed_img)
compress_gan_wm_extract = self.bwm.extract('output_img/compressed.jpg', wm_shape=self.len_wm, mode='str')
print("压缩攻击的提取结果:", compress_gan_wm_extract)

def resetFormatGan(self):
"""
变换格式攻击并提取水印
"""
cv2.imwrite('output_img/reset_format.png', self.embed_img)
wm_extract = self.bwm.extract('output_img/reset_format.png', wm_shape=self.len_wm, mode='str')
print("变换格式攻击的提取结果:", wm_extract)

def resizeGan(self):
"""
缩放攻击并提取水印
"""
image_attacked = att.resize_att(input_img=self.embed_img, out_shape=(400, 300))
image_recover = att.resize_att(input_img=image_attacked, out_shape=self.ori_img_shape[::-1])
cv2.imwrite('output_img/resize.jpg', image_attacked)
cv2.imwrite('output_img/resize_recover.jpg', image_recover)
resize_extract = self.bwm.extract(embed_img=image_recover, wm_shape=self.len_wm, mode='str')
print("缩放攻击后的提取结果:", resize_extract)

def rotateGan(self):
"""
旋转攻击并提取水印
"""
angle = 60
image_attacked = att.rot_att(input_img=self.embed_img, angle=angle)
image_recover = att.rot_att(input_img=image_attacked, output_file_name='output_img/rotate_recover.jpg',
angle=-angle)
rotate_extract = self.bwm.extract(embed_img=image_recover, wm_shape=self.len_wm, mode='str')
print(f"旋转攻击angle={angle}后的提取结果:", rotate_extract)

def saltGan(self):
"""
椒盐攻击并提取水印
"""
ratio = 0.05
image_attacked = att.salt_pepper_att(input_img=self.embed_img, output_file_name='output_img/salt.jpg',
ratio=ratio)
salt_extract = self.bwm.extract(embed_img=image_attacked, wm_shape=self.len_wm, mode='str')
print(f"椒盐攻击ratio={ratio}后的提取结果:", salt_extract)

def shelterGan(self):
"""
遮挡攻击并提取水印
"""
n = 60
image_attacked = att.shelter_att(input_img=self.embed_img, output_file_name='output_img/shelter.jpg', ratio=0.1,
n=n)
shelter_extract = self.bwm.extract(embed_img=image_attacked, wm_shape=self.len_wm, mode='str')
print(f"遮挡攻击{n}次后的提取结果:", shelter_extract)
assert self.wm == shelter_extract, '提取水印和原水印不一致'

def brightGan(self):
"""
亮度攻击并提取水印
"""
img_recover = att.bright_att(input_img=self.embed_img, output_file_name='output_img/bright_recover.jpg',
ratio=1.2)
wm_extract = self.bwm.extract(embed_img=img_recover, wm_shape=self.len_wm, mode='str')
print("亮度攻击后的提取结果:", wm_extract)
assert np.all(self.wm == wm_extract), '提取水印和原水印不一致'

测试攻击(attr_test.py):

在attr_test.py文件中,实例化GanWatermark类,分别调用类中攻击函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import watermark_attr

def gan_test():
gan_obj = watermark_attr.GanWatermark()
gan_obj.unGan() # 不攻击的情况下,提取水印
gan_obj.saltGan() # 椒盐攻击
gan_obj.compressGan() # 压缩攻击
gan_obj.brightGan() # 亮度攻击
gan_obj.resetFormatGan() # 格式攻击
gan_obj.resizeGan() # 缩放攻击
gan_obj.rotateGan() # 旋转攻击
gan_obj.shelterGan() # 遮挡攻击
if __name__ == '__main__':
gan_test()

核心类(bwm_core.py):

WaterMarkCore类其中定义了嵌入图片的核心算法以及嵌入过程,此部分****基于blind_watermark库原作者的代码上修改为符合需求的功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
#!/usr/bin/env python3
# coding=utf-8
# @Time : 2021/12/17
# @Author : github.com/guofei9987
import numpy as np
import copy
import cv2
from pywt import dwt2, idwt2
from .pool import AutoPool
import math


class WaterMarkCore:
def __init__(self, password_img=1, mode='common', processes=None):
self.block_shape = np.array([4, 4]) # 原
# self.block_shape = np.array([2, 2])
self.password_img = password_img
self.d1, self.d2 = 36, 20 # d1/d2 越大鲁棒性越强,但输出图片的失真越大 原
# init data
self.img, self.img_YUV = None, None # self.img 是原图,self.img_YUV 对像素做了加白偶数化
self.ca, self.hvd, = [np.array([])] * 3, [np.array([])] * 3 # 每个通道 dct 的结果
self.ca_block = [np.array([])] * 3 # 每个 channel 存一个四维 array,代表四维分块后的结果
self.ca_part = [np.array([])] * 3 # 四维分块后,有时因不整除而少一部分,self.ca_part 是少这一部分的 self.ca
self.wm_size, self.block_num = 0, 0 # 水印的长度,原图片可插入信息的个数
self.pool = AutoPool(mode=mode, processes=processes)

def init_block_index(self):
self.block_num = self.ca_block_shape[0] * self.ca_block_shape[1]
assert self.wm_size < self.block_num, IndexError(
'最多可嵌入{}kb信息,多于水印的{}kb信息,溢出'.format(self.block_num / 1000, self.wm_size / 1000))
# self.part_shape 是取整后的ca二维大小,用于嵌入时忽略右边和下面对不齐的细条部分。
self.part_shape = self.ca_block_shape[:2] * self.block_shape
self.block_index = [(i, j) for i in range(self.ca_block_shape[0]) for j in range(self.ca_block_shape[1])]

def read_img(self, filename):
# 读入图片->YUV化->加白边使像素变偶数->四维分块
img = cv2.imread(filename)
if img is None:
raise IOError("image file '{filename}' not read".format(filename=filename))

self.img = img.astype(np.float32)
self.img_shape = self.img.shape[:2]

# 如果不是偶数,那么补上白边,Y(明亮度)UV(颜色)
self.img_YUV = cv2.copyMakeBorder(cv2.cvtColor(self.img, cv2.COLOR_BGR2YUV),
0, self.img.shape[0] % 2, 0, self.img.shape[1] % 2,
cv2.BORDER_CONSTANT, value=(0, 0, 0))

self.ca_shape = [(i + 1) // 2 for i in self.img_shape]

self.ca_block_shape = (self.ca_shape[0] // self.block_shape[0], self.ca_shape[1] // self.block_shape[1],
self.block_shape[0], self.block_shape[1])
strides = 4 * np.array([self.ca_shape[1] * self.block_shape[0], self.block_shape[1], self.ca_shape[1], 1])

for channel in range(3):
self.ca[channel], self.hvd[channel] = dwt2(self.img_YUV[:, :, channel], 'haar')
# 转为4维度
self.ca_block[channel] = np.lib.stride_tricks.as_strided(self.ca[channel].astype(np.float32),
self.ca_block_shape, strides)
def read_img_arr(self, img):
# 处理透明图
self.alpha = None
if img.shape[2] == 4:
if img[:, :, 3].min() < 255:
self.alpha = img[:, :, 3]
img = img[:, :, :3]

# 读入图片->YUV化->加白边使像素变偶数->四维分块
self.img = img.astype(np.float32)
self.img_shape = self.img.shape[:2]

# 如果不是偶数,那么补上白边,Y(明亮度)UV(颜色)
self.img_YUV = cv2.copyMakeBorder(cv2.cvtColor(self.img, cv2.COLOR_BGR2YUV),
0, self.img.shape[0] % 2, 0, self.img.shape[1] % 2,
cv2.BORDER_CONSTANT, value=(0, 0, 0))

self.ca_shape = [(i + 1) // 2 for i in self.img_shape]

self.ca_block_shape = (self.ca_shape[0] // self.block_shape[0], self.ca_shape[1] // self.block_shape[1],
self.block_shape[0], self.block_shape[1])
strides = 4 * np.array([self.ca_shape[1] * self.block_shape[0], self.block_shape[1], self.ca_shape[1], 1])

for channel in range(3):
self.ca[channel], self.hvd[channel] = dwt2(self.img_YUV[:, :, channel], 'haar')
# 转为4维度
self.ca_block[channel] = np.lib.stride_tricks.as_strided(self.ca[channel].astype(np.float32),
self.ca_block_shape, strides)
def read_wm(self, wm_bit):
self.wm_bit = wm_bit
self.wm_size = wm_bit.size

def block_add_wm(self, arg):
block, shuffler, i = arg
# dct->(flatten->加密->逆flatten)->svd->打水印->逆svd->(flatten->解密->逆flatten)->逆dct
wm_1 = self.wm_bit[i % self.wm_size]
block_dct = cv2.dct(block)

# 加密(打乱顺序)
block_dct_shuffled = block_dct.flatten()[shuffler].reshape(self.block_shape)
U, s, V = np.linalg.svd(block_dct_shuffled)
s[0] = (s[0] // self.d1 + 1 / 4 + 1 / 2 * wm_1) * self.d1
if self.d2:
s[1] = (s[1] // self.d2 + 1 / 4 + 1 / 2 * wm_1) * self.d2

block_dct_flatten = np.dot(U, np.dot(np.diag(s), V)).flatten()
block_dct_flatten[shuffler] = block_dct_flatten.copy()
return cv2.idct(block_dct_flatten.reshape(self.block_shape))

def embed(self):
self.init_block_index()

embed_ca = copy.deepcopy(self.ca)
embed_YUV = [np.array([])] * 3

self.idx_shuffle = random_strategy1(self.password_img, self.block_num,
self.block_shape[0] * self.block_shape[1])
for channel in range(3):
tmp = self.pool.map(self.block_add_wm,
[(self.ca_block[channel][self.block_index[i]], self.idx_shuffle[i], i)
for i in range(self.block_num)])

for i in range(self.block_num):
self.ca_block[channel][self.block_index[i]] = tmp[i]

# 4维分块变回2维
self.ca_part[channel] = np.concatenate(np.concatenate(self.ca_block[channel], 1), 1)
# 4维分块时右边和下边不能整除的长条保留,其余是主体部分,换成 embed 之后的频域的数据
embed_ca[channel][:self.part_shape[0], :self.part_shape[1]] = self.ca_part[channel]
# 逆变换回去
embed_YUV[channel] = idwt2((embed_ca[channel], self.hvd[channel]), "haar")

# 合并3通道
embed_img_YUV = np.stack(embed_YUV, axis=2)
# 之前如果不是2的整数,增加了白边,这里去除掉
embed_img_YUV = embed_img_YUV[:self.img_shape[0], :self.img_shape[1]]
embed_img = cv2.cvtColor(embed_img_YUV, cv2.COLOR_YUV2BGR)
embed_img = np.clip(embed_img, a_min=0, a_max=255)

if self.alpha is not None:
embed_img = cv2.merge([embed_img.astype(np.uint8), self.alpha])
return embed_img

def block_get_wm(self, args):
block, shuffler = args
# dct->flatten->加密->逆flatten->svd->解水印
block_dct_shuffled = cv2.dct(block).flatten()[shuffler].reshape(self.block_shape)

U, s, V = np.linalg.svd(block_dct_shuffled)
wm = (s[0] % self.d1 > self.d1 / 2) * 1
if self.d2:
tmp = (s[1] % self.d2 > self.d2 / 2) * 1
wm = (wm * 3 + tmp * 1) / 4
return wm

def extract_raw(self, img):
# 每个分块提取 1 bit 信息
self.read_img_arr(img=img)
self.init_block_index()

wm_block_bit = np.zeros(shape=(3, self.block_num)) # 3个channel,length 个分块提取的水印,全都记录下来

self.idx_shuffle = random_strategy1(seed=self.password_img,
size=self.block_num,
block_shape=self.block_shape[0] * self.block_shape[1], # 16
)
for channel in range(3):
wm_block_bit[channel, :] = self.pool.map(self.block_get_wm,
[(self.ca_block[channel][self.block_index[i]], self.idx_shuffle[i])
for i in range(self.block_num)])
return wm_block_bit

def extract_avg(self, wm_block_bit):
# 对循环嵌入+3个 channel 求平均
wm_avg = np.zeros(shape=self.wm_size)
for i in range(self.wm_size):
wm_avg[i] = wm_block_bit[:, i::self.wm_size].mean()
return wm_avg

def extract(self, img, wm_shape):
self.wm_size = np.array(wm_shape).prod()

# 提取每个分块埋入的 bit:
wm_block_bit = self.extract_raw(img=img)
# 做平均:
wm_avg = self.extract_avg(wm_block_bit)
return wm_avg

def extract_with_kmeans(self, img, wm_shape):
wm_avg = self.extract(img=img, wm_shape=wm_shape)

return one_dim_kmeans(wm_avg)


def one_dim_kmeans(inputs):
threshold = 0
e_tol = 10 ** (-6)
center = [inputs.min(), inputs.max()] # 1. 初始化中心点
for i in range(300):
threshold = (center[0] + center[1]) / 2
is_class01 = inputs > threshold # 2. 检查所有点与这k个点之间的距离,每个点归类到最近的中心
center = [inputs[~is_class01].mean(), inputs[is_class01].mean()] # 3. 重新找中心点
if np.abs((center[0] + center[1]) / 2 - threshold) < e_tol: # 4. 停止条件
threshold = (center[0] + center[1]) / 2
break

is_class01 = inputs > threshold
return is_class01


def random_strategy1(seed, size, block_shape):
return np.random.RandomState(seed) \
.random(size=(size, block_shape)) \
.argsort(axis=1)


def random_strategy2(seed, size, block_shape):
one_line = np.random.RandomState(seed) \
.random(size=(1, block_shape)) \
.argsort(axis=1)

return np.repeat(one_line, repeats=size, axis=0)

运行

使用方法

水印嵌入:运行****image_watermark.py文件,将对input_img/test.jpg进行嵌入水印,嵌入水印后的图片保存在output_img/output_test.jpg(运行截图仅显示部分输出内容)

image-20230809192855023

图片攻击:运行****attr_test.py,将对input_img/test.jpg进行各种图片攻击,攻击后的图片结果保存在output_img当中。

image-20230809192718233

附录

**项目参考于GitHub开源仓库 **blind_watermark

原作者@guofei9987