AI 实战:垃圾分类系列(一)之快速搭建垃圾分类模型

代码模型 kira ⋅ 于 1个月前 ⋅ 617 阅读
来源:作者:szZack CSDN

前言

有网友说,如今每天去丢垃圾时,都要接受垃圾分类阿姨的灵魂拷问:“你是什么垃圾?”

Emmmm…
1
为了避免每天阿姨的灵魂拷问,我们最好是出门前提前对垃圾进精准分类。

下面提供一种快速搭建基于深度学习(AI)的垃圾分类模型,让垃圾分类不再难!


垃圾分类模型搭建

使用imagenet的1000个分类,模型网络使用inception-v3。再把1000个分类映射到垃圾的4个类别中,下面看详细步骤。

  • 搭建环境
    Ubuntu16.04
    python3.5
    tensorflow==1.4.0

    • 代码:
      classify_image.py:
# Copyright 2015 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

"""Simple image classification with Inception.
Run image classification with Inception trained on ImageNet 2012 Challenge data
set.
This program creates a graph from a saved GraphDef protocol buffer,
and runs inference on an input JPEG image. It outputs human readable
strings of the top 5 predictions along with their probabilities.
Change the --image_file argument to any jpg image to compute a
classification of that image.
Please see the tutorial and website for a detailed description of how
to use this script to perform image recognition.
https://tensorflow.org/tutorials/image_recognition/
"""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import argparse
import os.path
import re
import sys
import tarfile

import numpy as np
from six.moves import urllib
import tensorflow as tf

FLAGS = None

# pylint: disable=line-too-long
DATA_URL = 'http://download.tensorflow.org/models/image/imagenet/inception-2015-12-05.tgz'
# pylint: enable=line-too-long

class NodeLookup(object):
  """Converts integer node ID's to human readable labels."""

  def __init__(self, 
                uid_chinese_lookup_path, 
                label_lookup_path=None,
                uid_lookup_path=None):
    if not label_lookup_path:
      label_lookup_path = os.path.join(
          FLAGS.model_dir, 'imagenet_2012_challenge_label_map_proto.pbtxt')
    if not uid_lookup_path:
      uid_lookup_path = os.path.join(
          FLAGS.model_dir, 'imagenet_synset_to_human_label_map.txt')
    #self.node_lookup = self.load(label_lookup_path, uid_lookup_path)
    self.node_lookup = self.load_chinese_map(uid_chinese_lookup_path)

  def load(self, label_lookup_path, uid_lookup_path):
    """Loads a human readable English name for each softmax node.
    Args:
      label_lookup_path: string UID to integer node ID.
      uid_lookup_path: string UID to human-readable string.
    Returns:
      dict from integer node ID to human-readable string.
    """
    if not tf.gfile.Exists(uid_lookup_path):
      tf.logging.fatal('File does not exist %s', uid_lookup_path)
    if not tf.gfile.Exists(label_lookup_path):
      tf.logging.fatal('File does not exist %s', label_lookup_path)

    # Loads mapping from string UID to human-readable string
    proto_as_ascii_lines = tf.gfile.GFile(uid_lookup_path).readlines()
    uid_to_human = {}
    #p = re.compile(r'[n\d]*[ \S,]*')
    p = re.compile(r'(n\d*)\t(.*)')
    for line in proto_as_ascii_lines:
      parsed_items = p.findall(line)
      print(parsed_items)
      uid = parsed_items[0]
      human_string = parsed_items[1]
      uid_to_human[uid] = human_string

    # Loads mapping from string UID to integer node ID.
    node_id_to_uid = {}
    proto_as_ascii = tf.gfile.GFile(label_lookup_path).readlines()
    for line in proto_as_ascii:
      if line.startswith('  target_class:'):
        target_class = int(line.split(': ')[1])
      if line.startswith('  target_class_string:'):
        target_class_string = line.split(': ')[1]
        node_id_to_uid[target_class] = target_class_string[1:-2]

    # Loads the final mapping of integer node ID to human-readable string
    node_id_to_name = {}
    for key, val in node_id_to_uid.items():
      if val not in uid_to_human:
        tf.logging.fatal('Failed to locate: %s', val)
      name = uid_to_human[val]
      node_id_to_name[key] = name

    return node_id_to_name

  def load_chinese_map(self, uid_chinese_lookup_path):
    # Loads mapping from string UID to human-readable string
    proto_as_ascii_lines = tf.gfile.GFile(uid_chinese_lookup_path).readlines()
    uid_to_human = {}
    p = re.compile(r'(\d*)\t(.*)')
    for line in proto_as_ascii_lines:
      parsed_items = p.findall(line)
      #print(parsed_items)
      uid = parsed_items[0][0]
      human_string = parsed_items[0][1]
      uid_to_human[int(uid)] = human_string

    return uid_to_human

  def id_to_string(self, node_id):
    if node_id not in self.node_lookup:
      return ''
    return self.node_lookup[node_id]

def create_graph():
  """Creates a graph from saved GraphDef file and returns a saver."""
  # Creates graph from saved graph_def.pb.
  with tf.gfile.FastGFile(os.path.join(
      FLAGS.model_dir, 'classify_image_graph_def.pb'), 'rb') as f:
    graph_def = tf.GraphDef()
    graph_def.ParseFromString(f.read())
    _ = tf.import_graph_def(graph_def, name='')

def run_inference_on_image(image):
  """Runs inference on an image.
  Args:
    image: Image file name.
  Returns:
    Nothing
  """
  if not tf.gfile.Exists(image):
    tf.logging.fatal('File does not exist %s', image)
  image_data = tf.gfile.FastGFile(image, 'rb').read()

  # Creates graph from saved GraphDef.
  create_graph()

  with tf.Session() as sess:
    # Some useful tensors:
    # 'softmax:0': A tensor containing the normalized prediction across
    #   1000 labels.
    # 'pool_3:0': A tensor containing the next-to-last layer containing 2048
    #   float description of the image.
    # 'DecodeJpeg/contents:0': A tensor containing a string providing JPEG
    #   encoding of the image.
    # Runs the softmax tensor by feeding the image_data as input to the graph.
    softmax_tensor = sess.graph.get_tensor_by_name('softmax:0')
    predictions = sess.run(softmax_tensor,
                           {'DecodeJpeg/contents:0': image_data})
    predictions = np.squeeze(predictions)

    # Creates node ID --> chinese string lookup.
    node_lookup = NodeLookup(uid_chinese_lookup_path='./data/imagenet_2012_challenge_label_chinese_map.pbtxt')

    top_k = predictions.argsort()[-FLAGS.num_top_predictions:][::-1]
    for node_id in top_k:
      human_string = node_lookup.id_to_string(node_id)
      score = predictions[node_id]
      print('%s (score = %.5f)' % (human_string, score))
      #print('node_id: %s' %(node_id))

def maybe_download_and_extract():
  """Download and extract model tar file."""
  dest_directory = FLAGS.model_dir
  if not os.path.exists(dest_directory):
    os.makedirs(dest_directory)
  filename = DATA_URL.split('/')[-1]
  filepath = os.path.join(dest_directory, filename)
  if not os.path.exists(filepath):
    def _progress(count, block_size, total_size):
      sys.stdout.write('\r>> Downloading %s %.1f%%' % (
          filename, float(count * block_size) / float(total_size) * 100.0))
      sys.stdout.flush()
    filepath, _ = urllib.request.urlretrieve(DATA_URL, filepath, _progress)
    print()
    statinfo = os.stat(filepath)
    print('Successfully downloaded', filename, statinfo.st_size, 'bytes.')
  tarfile.open(filepath, 'r:gz').extractall(dest_directory)

def main(_):
  maybe_download_and_extract()
  image = (FLAGS.image_file if FLAGS.image_file else
           os.path.join(FLAGS.model_dir, 'cropped_panda.jpg'))
  run_inference_on_image(image)

if __name__ == '__main__':
  parser = argparse.ArgumentParser()
  # classify_image_graph_def.pb:
  #   Binary representation of the GraphDef protocol buffer.
  # imagenet_synset_to_human_label_map.txt:
  #   Map from synset ID to a human readable string.
  # imagenet_2012_challenge_label_map_proto.pbtxt:
  #   Text representation of a protocol buffer mapping a label to synset ID.
  parser.add_argument(
      '--model_dir',
      type=str,
      default='/tmp/imagenet',
      help="""\
      Path to classify_image_graph_def.pb,
      imagenet_synset_to_human_label_map.txt, and
      imagenet_2012_challenge_label_map_proto.pbtxt.\
      """
  )
  parser.add_argument(
      '--image_file',
      type=str,
      default='',
      help='Absolute path to image file.'
  )
  parser.add_argument(
      '--num_top_predictions',
      type=int,
      default=5,
      help='Display this many predictions.'
  )
  FLAGS, unparsed = parser.parse_known_args()
  tf.app.run(main=main, argv=[sys.argv[0]] + unparsed)
  • 下载模型
    python classify_image.py

    下载成功是这个样子的: 3

  • 模型测试

    从网上找一张图片,保存为:./img/2.png,如下:

    1
    测试方法:
    python classify_image.py \--image_file ./data/2.png

    结果输出:

    cellular telephone, cellular phone, cellphone, cell, mobile phone (score = 0.70547)
    iPod (score = 0.06823)
    notebook, notebook computer (score = 0.04934)
    modem (score = 0.01472)
    hand-held computer, hand-held microcomputer (score = 0.00770)

    可以看到识别结果还是蛮准的,而且给出了top5.

    使用中文标签:
    测试方法:
    python classify_image.py \--image_file ./data/2.png

    结果输出:

    移动电话,移动电话,手机,手机,手机 (score = 0.70547)
    iPod (score = 0.06823)
    笔记本,笔记本电脑 (score = 0.04934)
    调制解调器 (score = 0.01472)
    手持电脑,手持微电脑 (score = 0.00770)

    有了中文分类类别,下面就可以做垃圾分类映射了。

垃圾分类映射

上海对垃圾分干垃圾、湿垃圾、可回收物、有害垃圾四种,生活垃圾主要分干垃圾和湿垃圾。

上海生活垃圾分类标准及投放要求 【点击查看】

  • 核心思想:
    1、使用4类垃圾分类数据作为标注数据,形如

    0 饮料瓶
    1 废电池
    2 绿叶菜
    3 卫生间用纸

    2、使用TextCNN训练分类模型

    • 实战
      1、数据标注
      标注结果见:./data/train_data.txt , ./data/vilid_data.txt

    2、核心代码:
    predict.py :

    import tensorflow as tf
    import numpy as np
    import os, sys
    import data_input_helper as data_helpers
    import jieba
    
    # Parameters
    
    # Data Parameters
    tf.flags.DEFINE_string("w2v_file", "./data/word2vec.bin", "w2v_file path")
    
    # Eval Parameters
    tf.flags.DEFINE_integer("batch_size", 64, "Batch Size (default: 64)")
    tf.flags.DEFINE_string("checkpoint_dir", "./runs/checkpoints/", "Checkpoint directory from training run")
    
    # Misc Parameters
    tf.flags.DEFINE_boolean("allow_soft_placement", True, "Allow device soft device placement")
    tf.flags.DEFINE_boolean("log_device_placement", False, "Log placement of ops on devices")
    
    FLAGS = tf.flags.FLAGS
    FLAGS._parse_flags()
    
    class RefuseClassification():
    
    def __init__(self):
    
        self.w2v_wr = data_helpers.w2v_wrapper(FLAGS.w2v_file)#加载词向量
        self.init_model()
        self.refuse_classification_map = {0: '可回收垃圾', 1: '有害垃圾', 2: '湿垃圾', 3: '干垃圾'}
    
    def deal_data(self, text, max_document_length = 10):
    
        words = jieba.cut(text)
        x_text = [' '.join(words)]
        x = data_helpers.get_text_idx(x_text, self.w2v_wr.model.vocab_hash, max_document_length)
    
        return x
    
    def init_model(self):
    
        checkpoint_file = tf.train.latest_checkpoint(FLAGS.checkpoint_dir)
        graph = tf.Graph()
        with graph.as_default():
            session_conf = tf.ConfigProto(
                              allow_soft_placement=FLAGS.allow_soft_placement, 
                              log_device_placement=FLAGS.log_device_placement)
            self.sess = tf.Session(config=session_conf)
            self.sess.as_default()
            # Load the saved meta graph and restore variables
            saver = tf.train.import_meta_graph("{}.meta".format(checkpoint_file))
            saver.restore(self.sess, checkpoint_file)
    
            # Get the placeholders from the graph by name
            self.input_x = graph.get_operation_by_name("input_x").outputs[0]
    
            self.dropout_keep_prob = graph.get_operation_by_name("dropout_keep_prob").outputs[0]
    
            # Tensors we want to evaluate
            self.predictions = graph.get_operation_by_name("output/predictions").outputs[0]
    
    def predict(self, text):
    
        x_test = self.deal_data(text, 5)
        predictions = self.sess.run(self.predictions, {self.input_x: x_test, self.dropout_keep_prob: 1.0})
    
        refuse_text = self.refuse_classification_map[predictions[0]]
        return refuse_text
    
    if __name__ == "__main__":
    if len(sys.argv) == 2:
        test = RefuseClassification()
        res = test.predict(sys.argv[1])
        print('classify:', res)

    3、测试
    python textcnn/predict.py '猪肉饺子'

    输出结果:

    `classify: 湿垃圾`

整合imagenet分类模型、textcnn映射模型

  • 核心代码:
    rafuse.py
import numpy as np
import os, sys
sys.path.append('textcnn')
from textcnn.predict import RefuseClassification
from classify_image import *

class RafuseRecognize():

    def __init__(self):

        self.refuse_classification = RefuseClassification()
        self.init_classify_image_model()
        self.node_lookup = NodeLookup(uid_chinese_lookup_path='./data/imagenet_2012_challenge_label_chinese_map.pbtxt', 
                                model_dir = '/tmp/imagenet')

    def init_classify_image_model(self):

        create_graph('/tmp/imagenet')

        self.sess = tf.Session()
        self.softmax_tensor = self.sess.graph.get_tensor_by_name('softmax:0')

    def recognize_image(self, image_data):

        predictions = self.sess.run(self.softmax_tensor,
                               {'DecodeJpeg/contents:0': image_data})
        predictions = np.squeeze(predictions)

        top_k = predictions.argsort()[-5:][::-1]
        result_list = []
        for node_id in top_k:
            human_string = self.node_lookup.id_to_string(node_id)
            #print(human_string)
            human_string = ''.join(list(set(human_string.replace(',', ',').split(','))))
            #print(human_string)
            classification = self.refuse_classification.predict(human_string)
            result_list.append('%s  =>  %s' % (human_string, classification))

        return '\n'.join(result_list)

if __name__ == "__main__":
    if len(sys.argv) == 2:
        test = RafuseRecognize()
        image_data = tf.gfile.FastGFile(sys.argv[1], 'rb').read()
        res = test.recognize_image(image_data)
        print('classify:\n%s' %(res))
  • 垃圾分类识别

    • 识别
      python rafuse.py img/2.png

    输出结果:

    移动电话手机  =>  可回收垃圾
    iPod  =>  湿垃圾
    笔记本笔记本电脑  =>  可回收垃圾
    调制解调器  =>  湿垃圾
    手持电脑手持微电脑  =>  可回收垃圾

    到这里整个垃圾分类识别模型基本完成,可以看到有个别错误,由于训练数据太少了导致的,这里就不在优化了。

参考

https://github.com/tensorflow/models

相关推荐:
AI实战:垃圾分类系列(二)之快速搭建垃圾分类模型后台服务
AI实战:垃圾分类系列(三)之快速搭建垃圾分类智能问答机器人

基于 CNN 实现垃圾分类

成为第一个点赞的人吧 :bowtie:
回复数量: 0
暂无回复~
您需要登陆以后才能留下评论!