diff options
| author | stephanchrst <stephanchrst@gmail.com> | 2022-05-10 21:51:50 +0700 |
|---|---|---|
| committer | stephanchrst <stephanchrst@gmail.com> | 2022-05-10 21:51:50 +0700 |
| commit | 3751379f1e9a4c215fb6eb898b4ccc67659b9ace (patch) | |
| tree | a44932296ef4a9b71d5f010906253d8c53727726 /addons/base_import/tests | |
| parent | 0a15094050bfde69a06d6eff798e9a8ddf2b8c21 (diff) | |
initial commit 2
Diffstat (limited to 'addons/base_import/tests')
| -rw-r--r-- | addons/base_import/tests/__init__.py | 5 | ||||
| -rw-r--r-- | addons/base_import/tests/test.ods | bin | 0 -> 4634 bytes | |||
| -rw-r--r-- | addons/base_import/tests/test.xls | bin | 0 -> 6656 bytes | |||
| -rw-r--r-- | addons/base_import/tests/test.xlsx | bin | 0 -> 6473 bytes | |||
| -rw-r--r-- | addons/base_import/tests/test_base_import.py | 762 | ||||
| -rw-r--r-- | addons/base_import/tests/test_csv_magic.py | 169 |
6 files changed, 936 insertions, 0 deletions
diff --git a/addons/base_import/tests/__init__.py b/addons/base_import/tests/__init__.py new file mode 100644 index 00000000..d0786a33 --- /dev/null +++ b/addons/base_import/tests/__init__.py @@ -0,0 +1,5 @@ +# -*- coding: utf-8 -*- +# Part of Odoo. See LICENSE file for full copyright and licensing details. + +from . import test_base_import +from . import test_csv_magic diff --git a/addons/base_import/tests/test.ods b/addons/base_import/tests/test.ods Binary files differnew file mode 100644 index 00000000..b6bfa63b --- /dev/null +++ b/addons/base_import/tests/test.ods diff --git a/addons/base_import/tests/test.xls b/addons/base_import/tests/test.xls Binary files differnew file mode 100644 index 00000000..944f14c2 --- /dev/null +++ b/addons/base_import/tests/test.xls diff --git a/addons/base_import/tests/test.xlsx b/addons/base_import/tests/test.xlsx Binary files differnew file mode 100644 index 00000000..c549c737 --- /dev/null +++ b/addons/base_import/tests/test.xlsx diff --git a/addons/base_import/tests/test_base_import.py b/addons/base_import/tests/test_base_import.py new file mode 100644 index 00000000..d4cbbbd8 --- /dev/null +++ b/addons/base_import/tests/test_base_import.py @@ -0,0 +1,762 @@ +# -*- coding: utf-8 -*- +# Part of Odoo. See LICENSE file for full copyright and licensing details. +import base64 +import difflib +import io +import pprint +import unittest + +from odoo.tests.common import TransactionCase, can_import +from odoo.modules.module import get_module_resource +from odoo.tools import mute_logger, pycompat + +ID_FIELD = { + 'id': 'id', + 'name': 'id', + 'string': "External ID", + + 'required': False, + 'fields': [], + 'type': 'id', +} + + +def make_field(name='value', string='Value', required=False, fields=[], field_type='id'): + return [ + ID_FIELD, + {'id': name, 'name': name, 'string': string, 'required': required, 'fields': fields, 'type': field_type}, + ] + + +def sorted_fields(fields): + """ recursively sort field lists to ease comparison """ + recursed = [dict(field, fields=sorted_fields(field['fields'])) for field in fields] + return sorted(recursed, key=lambda field: field['id']) + + +class BaseImportCase(TransactionCase): + + def assertEqualFields(self, fields1, fields2): + f1 = sorted_fields(fields1) + f2 = sorted_fields(fields2) + assert f1 == f2, '\n'.join(difflib.unified_diff( + pprint.pformat(f1).splitlines(), + pprint.pformat(f2).splitlines() + )) + +class TestBasicFields(BaseImportCase): + + def get_fields(self, field): + return self.env['base_import.import'].get_fields('base_import.tests.models.' + field) + + def test_base(self): + """ A basic field is not required """ + self.assertEqualFields(self.get_fields('char'), make_field(field_type='char')) + + def test_required(self): + """ Required fields should be flagged (so they can be fill-required) """ + self.assertEqualFields(self.get_fields('char.required'), make_field(required=True, field_type='char')) + + def test_readonly(self): + """ Readonly fields should be filtered out""" + self.assertEqualFields(self.get_fields('char.readonly'), [ID_FIELD]) + + def test_readonly_states(self): + """ Readonly fields with states should not be filtered out""" + self.assertEqualFields(self.get_fields('char.states'), make_field(field_type='char')) + + def test_readonly_states_noreadonly(self): + """ Readonly fields with states having nothing to do with + readonly should still be filtered out""" + self.assertEqualFields(self.get_fields('char.noreadonly'), [ID_FIELD]) + + def test_readonly_states_stillreadonly(self): + """ Readonly fields with readonly states leaving them readonly + always... filtered out""" + self.assertEqualFields(self.get_fields('char.stillreadonly'), [ID_FIELD]) + + def test_m2o(self): + """ M2O fields should allow import of themselves (name_get), + their id and their xid""" + self.assertEqualFields(self.get_fields('m2o'), make_field(field_type='many2one', fields=[ + {'id': 'value', 'name': 'id', 'string': 'External ID', 'required': False, 'fields': [], 'type': 'id'}, + {'id': 'value', 'name': '.id', 'string': 'Database ID', 'required': False, 'fields': [], 'type': 'id'}, + ])) + + def test_m2o_required(self): + """ If an m2o field is required, its three sub-fields are + required as well (the client has to handle that: requiredness + is id-based) + """ + self.assertEqualFields(self.get_fields('m2o.required'), make_field(field_type='many2one', required=True, fields=[ + {'id': 'value', 'name': 'id', 'string': 'External ID', 'required': True, 'fields': [], 'type': 'id'}, + {'id': 'value', 'name': '.id', 'string': 'Database ID', 'required': True, 'fields': [], 'type': 'id'}, + ])) + + +class TestO2M(BaseImportCase): + + def get_fields(self, field): + return self.env['base_import.import'].get_fields('base_import.tests.models.' + field) + + def test_shallow(self): + self.assertEqualFields( + self.get_fields('o2m'), [ + ID_FIELD, + {'id': 'name', 'name': 'name', 'string': "Name", 'required': False, 'fields': [], 'type': 'char',}, + { + 'id': 'value', 'name': 'value', 'string': 'Value', + 'required': False, 'type': 'one2many', + 'fields': [ + ID_FIELD, + { + 'id': 'parent_id', 'name': 'parent_id', + 'string': 'Parent', 'type': 'many2one', + 'required': False, 'fields': [ + {'id': 'parent_id', 'name': 'id', + 'string': 'External ID', 'required': False, + 'fields': [], 'type': 'id'}, + {'id': 'parent_id', 'name': '.id', + 'string': 'Database ID', 'required': False, + 'fields': [], 'type': 'id'}, + ] + }, + {'id': 'value', 'name': 'value', 'string': 'Value', + 'required': False, 'fields': [], 'type': 'integer' + }, + ] + } + ] + ) + + +class TestMatchHeadersSingle(TransactionCase): + + def test_match_by_name(self): + match = self.env['base_import.import']._match_header('f0', [{'name': 'f0'}], {}) + self.assertEqual(match, [{'name': 'f0'}]) + + def test_match_by_string(self): + match = self.env['base_import.import']._match_header('some field', [{'name': 'bob', 'string': "Some Field"}], {}) + self.assertEqual(match, [{'name': 'bob', 'string': "Some Field"}]) + + def test_nomatch(self): + match = self.env['base_import.import']._match_header('should not be', [{'name': 'bob', 'string': "wheee"}], {}) + self.assertEqual(match, []) + + def test_recursive_match(self): + f = { + 'name': 'f0', + 'string': "My Field", + 'fields': [ + {'name': 'f0', 'string': "Sub field 0", 'fields': []}, + {'name': 'f1', 'string': "Sub field 2", 'fields': []}, + ] + } + match = self.env['base_import.import']._match_header('f0/f1', [f], {}) + self.assertEqual(match, [f, f['fields'][1]]) + + def test_recursive_nomatch(self): + """ Match first level, fail to match second level + """ + f = { + 'name': 'f0', + 'string': "My Field", + 'fields': [ + {'name': 'f0', 'string': "Sub field 0", 'fields': []}, + {'name': 'f1', 'string': "Sub field 2", 'fields': []}, + ] + } + match = self.env['base_import.import']._match_header('f0/f2', [f], {}) + self.assertEqual(match, []) + + +class TestMatchHeadersMultiple(TransactionCase): + + def test_noheaders(self): + self.assertEqual( + self.env['base_import.import']._match_headers([], [], {}), ([], {}) + ) + + def test_nomatch(self): + self.assertEqual( + self.env['base_import.import']._match_headers( + iter([ + ['foo', 'bar', 'baz', 'qux'], + ['v1', 'v2', 'v3', 'v4'], + ]), + [], + {'headers': True}), + ( + ['foo', 'bar', 'baz', 'qux'], + dict.fromkeys(range(4)) + ) + ) + + def test_mixed(self): + self.assertEqual( + self.env['base_import.import']._match_headers( + iter(['foo bar baz qux/corge'.split()]), + [ + {'name': 'bar', 'string': 'Bar'}, + {'name': 'bob', 'string': 'Baz'}, + {'name': 'qux', 'string': 'Qux', 'fields': [ + {'name': 'corge', 'fields': []}, + ]} + ], + {'headers': True}), + (['foo', 'bar', 'baz', 'qux/corge'], { + 0: None, + 1: ['bar'], + 2: ['bob'], + 3: ['qux', 'corge'], + }) + ) + + +class TestColumnMapping(TransactionCase): + + def test_column_mapping(self): + import_record = self.env['base_import.import'].create({ + 'res_model': 'base_import.tests.models.preview', + 'file': u"Name,Some Value,value\n" + u"chhagan,10,1\n" + u"magan,20,2\n".encode('utf-8'), + 'file_type': 'text/csv', + 'file_name': 'data.csv', + }) + import_record.do( + ['name', 'somevalue', 'othervalue'], + ['Name', 'Some Value', 'value'], + {'quoting': '"', 'separator': ',', 'headers': True}, + True + ) + fields = self.env['base_import.mapping'].search_read( + [('res_model', '=', 'base_import.tests.models.preview')], + ['column_name', 'field_name'] + ) + self.assertItemsEqual([f['column_name'] for f in fields], ['Name', 'Some Value', 'value']) + self.assertItemsEqual([f['field_name'] for f in fields], ['somevalue', 'name', 'othervalue']) + + +class TestPreview(TransactionCase): + + def make_import(self): + import_wizard = self.env['base_import.import'].create({ + 'res_model': 'res.users', + 'file': u"로그인,언어\nbob,1\n".encode('euc_kr'), + 'file_type': 'text/csv', + 'file_name': 'kr_data.csv', + }) + return import_wizard + + @mute_logger('odoo.addons.base_import.models.base_import') + def test_encoding(self): + import_wizard = self.make_import() + result = import_wizard.parse_preview({ + 'quoting': '"', + 'separator': ',', + }) + self.assertFalse('error' in result) + + @mute_logger('odoo.addons.base_import.models.base_import') + def test_csv_errors(self): + import_wizard = self.make_import() + + result = import_wizard.parse_preview({ + 'quoting': 'foo', + 'separator': ',', + }) + self.assertTrue('error' in result) + + result = import_wizard.parse_preview({ + 'quoting': '"', + 'separator': 'bob', + }) + self.assertTrue('error' in result) + + def test_csv_success(self): + import_wizard = self.env['base_import.import'].create({ + 'res_model': 'base_import.tests.models.preview', + 'file': b'name,Some Value,Counter\n' + b'foo,1,2\n' + b'bar,3,4\n' + b'qux,5,6\n', + 'file_type': 'text/csv' + }) + + result = import_wizard.parse_preview({ + 'quoting': '"', + 'separator': ',', + 'headers': True, + }) + self.assertIsNone(result.get('error')) + self.assertEqual(result['matches'], {0: ['name'], 1: ['somevalue'], 2: None}) + self.assertEqual(result['headers'], ['name', 'Some Value', 'Counter']) + # Order depends on iteration order of fields_get + self.assertItemsEqual(result['fields'], [ + ID_FIELD, + {'id': 'name', 'name': 'name', 'string': 'Name', 'required': False, 'fields': [], 'type': 'char'}, + {'id': 'somevalue', 'name': 'somevalue', 'string': 'Some Value', 'required': True, 'fields': [], 'type': 'integer'}, + {'id': 'othervalue', 'name': 'othervalue', 'string': 'Other Variable', 'required': False, 'fields': [], 'type': 'integer'}, + ]) + self.assertEqual(result['preview'], [ + ['foo', '1', '2'], + ['bar', '3', '4'], + ['qux', '5', '6'], + ]) + + @unittest.skipUnless(can_import('xlrd'), "XLRD module not available") + def test_xls_success(self): + xls_file_path = get_module_resource('base_import', 'tests', 'test.xls') + file_content = open(xls_file_path, 'rb').read() + import_wizard = self.env['base_import.import'].create({ + 'res_model': 'base_import.tests.models.preview', + 'file': file_content, + 'file_type': 'application/vnd.ms-excel' + }) + + result = import_wizard.parse_preview({ + 'headers': True, + }) + self.assertIsNone(result.get('error')) + self.assertEqual(result['matches'], {0: ['name'], 1: ['somevalue'], 2: None}) + self.assertEqual(result['headers'], ['name', 'Some Value', 'Counter']) + self.assertItemsEqual(result['fields'], [ + ID_FIELD, + {'id': 'name', 'name': 'name', 'string': 'Name', 'required': False, 'fields': [], 'type': 'char'}, + {'id': 'somevalue', 'name': 'somevalue', 'string': 'Some Value', 'required': True, 'fields': [], 'type': 'integer'}, + {'id': 'othervalue', 'name': 'othervalue', 'string': 'Other Variable', 'required': False, 'fields': [], 'type': 'integer'}, + ]) + self.assertEqual(result['preview'], [ + ['foo', '1', '2'], + ['bar', '3', '4'], + ['qux', '5', '6'], + ]) + + @unittest.skipUnless(can_import('xlrd.xlsx'), "XLRD/XLSX not available") + def test_xlsx_success(self): + xlsx_file_path = get_module_resource('base_import', 'tests', 'test.xlsx') + file_content = open(xlsx_file_path, 'rb').read() + import_wizard = self.env['base_import.import'].create({ + 'res_model': 'base_import.tests.models.preview', + 'file': file_content, + 'file_type': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet' + }) + + result = import_wizard.parse_preview({ + 'headers': True, + }) + self.assertIsNone(result.get('error')) + self.assertEqual(result['matches'], {0: ['name'], 1: ['somevalue'], 2: None}) + self.assertEqual(result['headers'], ['name', 'Some Value', 'Counter']) + self.assertItemsEqual(result['fields'], [ + ID_FIELD, + {'id': 'name', 'name': 'name', 'string': 'Name', 'required': False, 'fields': [], 'type': 'char'}, + {'id': 'somevalue', 'name': 'somevalue', 'string': 'Some Value', 'required': True, 'fields': [], 'type': 'integer'}, + {'id': 'othervalue', 'name': 'othervalue', 'string': 'Other Variable', 'required': False, 'fields': [], 'type': 'integer'}, + ]) + self.assertEqual(result['preview'], [ + ['foo', '1', '2'], + ['bar', '3', '4'], + ['qux', '5', '6'], + ]) + + @unittest.skipUnless(can_import('odf'), "ODFPY not available") + def test_ods_success(self): + ods_file_path = get_module_resource('base_import', 'tests', 'test.ods') + file_content = open(ods_file_path, 'rb').read() + import_wizard = self.env['base_import.import'].create({ + 'res_model': 'base_import.tests.models.preview', + 'file': file_content, + 'file_type': 'application/vnd.oasis.opendocument.spreadsheet' + }) + + result = import_wizard.parse_preview({ + 'headers': True, + }) + self.assertIsNone(result.get('error')) + self.assertEqual(result['matches'], {0: ['name'], 1: ['somevalue'], 2: None}) + self.assertEqual(result['headers'], ['name', 'Some Value', 'Counter']) + self.assertItemsEqual(result['fields'], [ + ID_FIELD, + {'id': 'name', 'name': 'name', 'string': 'Name', 'required': False, 'fields': [], 'type': 'char'}, + {'id': 'somevalue', 'name': 'somevalue', 'string': 'Some Value', 'required': True, 'fields': [], 'type': 'integer'}, + {'id': 'othervalue', 'name': 'othervalue', 'string': 'Other Variable', 'required': False, 'fields': [], 'type': 'integer'}, + ]) + self.assertEqual(result['preview'], [ + ['foo', '1', '2'], + ['bar', '3', '4'], + ['aux', '5', '6'], + ]) + +class test_convert_import_data(TransactionCase): + """ Tests conversion of base_import.import input into data which + can be fed to Model.load + """ + def test_all(self): + import_wizard = self.env['base_import.import'].create({ + 'res_model': 'base_import.tests.models.preview', + 'file': b'name,Some Value,Counter\n' + b'foo,1,2\n' + b'bar,3,4\n' + b'qux,5,6\n', + 'file_type': 'text/csv' + + }) + data, fields = import_wizard._convert_import_data( + ['name', 'somevalue', 'othervalue'], + {'quoting': '"', 'separator': ',', 'headers': True} + ) + + self.assertItemsEqual(fields, ['name', 'somevalue', 'othervalue']) + self.assertItemsEqual(data, [ + ['foo', '1', '2'], + ['bar', '3', '4'], + ['qux', '5', '6'], + ]) + + def test_date_fields(self): + import_wizard = self.env['base_import.import'].create({ + 'res_model': 'res.partner', + 'file': u'name,date,create_date\n' + u'"foo","2013年07月18日","2016-10-12 06:06"\n'.encode('utf-8'), + 'file_type': 'text/csv' + + }) + + results = import_wizard.do( + ['name', 'date', 'create_date'], + [], + { + 'date_format': '%Y年%m月%d日', + 'datetime_format': '%Y-%m-%d %H:%M', + 'quoting': '"', + 'separator': ',', + 'headers': True + } + ) + + # if results empty, no errors + self.assertItemsEqual(results['messages'], []) + + def test_parse_relational_fields(self): + """ Ensure that relational fields float and date are correctly + parsed during the import call. + """ + import_wizard = self.env['base_import.import'].create({ + 'res_model': 'res.partner', + 'file': u'name,parent_id/id,parent_id/date,parent_id/credit_limit\n' + u'"foo","__export__.res_partner_1","2017年10月12日","5,69"\n'.encode('utf-8'), + 'file_type': 'text/csv' + + }) + options = { + 'date_format': '%Y年%m月%d日', + 'quoting': '"', + 'separator': ',', + 'float_decimal_separator': ',', + 'float_thousand_separator': '.', + 'headers': True + } + data, import_fields = import_wizard._convert_import_data( + ['name', 'parent_id/.id', 'parent_id/date', 'parent_id/credit_limit'], + options + ) + result = import_wizard._parse_import_data(data, import_fields, options) + # Check if the data 5,69 as been correctly parsed. + self.assertEqual(float(result[0][-1]), 5.69) + self.assertEqual(str(result[0][-2]), '2017-10-12') + + def test_parse_scientific_notation(self): + """ Ensure that scientific notation is correctly converted to decimal """ + import_wizard = self.env['base_import.import'] + + test_options = {} + test_data = [ + ["1E+05"], + ["1.20E-05"], + ["1,9e5"], + ["9,5e-5"], + ] + expected_result = [ + ["100000.000000"], + ["0.000012"], + ["190000.000000"], + ["0.000095"], + ] + + import_wizard._parse_float_from_data(test_data, 0, 'test-name', test_options) + self.assertEqual(test_data, expected_result) + + def test_filtered(self): + """ If ``False`` is provided as field mapping for a column, + that column should be removed from importable data + """ + import_wizard = self.env['base_import.import'].create({ + 'res_model': 'base_import.tests.models.preview', + 'file': b'name,Some Value,Counter\n' + b'foo,1,2\n' + b'bar,3,4\n' + b'qux,5,6\n', + 'file_type': 'text/csv' + }) + data, fields = import_wizard._convert_import_data( + ['name', False, 'othervalue'], + {'quoting': '"', 'separator': ',', 'headers': True} + ) + + self.assertItemsEqual(fields, ['name', 'othervalue']) + self.assertItemsEqual(data, [ + ['foo', '2'], + ['bar', '4'], + ['qux', '6'], + ]) + + def test_norow(self): + """ If a row is composed only of empty values (due to having + filtered out non-empty values from it), it should be removed + """ + import_wizard = self.env['base_import.import'].create({ + 'res_model': 'base_import.tests.models.preview', + 'file': b'name,Some Value,Counter\n' + b'foo,1,2\n' + b',3,\n' + b',5,6\n', + 'file_type': 'text/csv' + }) + data, fields = import_wizard._convert_import_data( + ['name', False, 'othervalue'], + {'quoting': '"', 'separator': ',', 'headers': True} + ) + + self.assertItemsEqual(fields, ['name', 'othervalue']) + self.assertItemsEqual(data, [ + ['foo', '2'], + ['', '6'], + ]) + + def test_empty_rows(self): + import_wizard = self.env['base_import.import'].create({ + 'res_model': 'base_import.tests.models.preview', + 'file': b'name,Some Value\n' + b'foo,1\n' + b'\n' + b'bar,2\n' + b' \n' + b'\t \n', + 'file_type': 'text/csv' + }) + data, fields = import_wizard._convert_import_data( + ['name', 'somevalue'], + {'quoting': '"', 'separator': ',', 'headers': True} + ) + + self.assertItemsEqual(fields, ['name', 'somevalue']) + self.assertItemsEqual(data, [ + ['foo', '1'], + ['bar', '2'], + ]) + + def test_nofield(self): + import_wizard = self.env['base_import.import'].create({ + 'res_model': 'base_import.tests.models.preview', + 'file': b'name,Some Value,Counter\n' + b'foo,1,2\n', + 'file_type': 'text/csv' + + }) + self.assertRaises(ValueError, import_wizard._convert_import_data, [], {'quoting': '"', 'separator': ',', 'headers': True}) + + def test_falsefields(self): + import_wizard = self.env['base_import.import'].create({ + 'res_model': 'base_import.tests.models.preview', + 'file': b'name,Some Value,Counter\n' + b'foo,1,2\n', + 'file_type': 'text/csv' + }) + + self.assertRaises( + ValueError, + import_wizard._convert_import_data, + [False, False, False], + {'quoting': '"', 'separator': ',', 'headers': True}) + + def test_newline_import(self): + """ + Ensure importing keep newlines + """ + output = io.BytesIO() + writer = pycompat.csv_writer(output, quoting=1) + + data_row = [u"\tfoo\n\tbar", u" \"hello\" \n\n 'world' "] + + writer.writerow([u"name", u"Some Value"]) + writer.writerow(data_row) + + import_wizard = self.env['base_import.import'].create({ + 'res_model': 'base_import.tests.models.preview', + 'file': output.getvalue(), + 'file_type': 'text/csv', + }) + data, _ = import_wizard._convert_import_data( + ['name', 'somevalue'], + {'quoting': '"', 'separator': ',', 'headers': True} + ) + + self.assertItemsEqual(data, [data_row]) + +class TestBatching(TransactionCase): + def _makefile(self, rows): + f = io.BytesIO() + writer = pycompat.csv_writer(f, quoting=1) + writer.writerow(['name', 'counter']) + for i in range(rows): + writer.writerow(['n_%d' % i, str(i)]) + return f.getvalue() + + def test_recognize_batched(self): + import_wizard = self.env['base_import.import'].create({ + 'res_model': 'base_import.tests.models.preview', + 'file_type': 'text/csv', + }) + + import_wizard.file = self._makefile(10) + result = import_wizard.parse_preview({ + 'quoting': '"', + 'separator': ',', + 'headers': True, + 'limit': 100, + }) + self.assertIsNone(result.get('error')) + self.assertIs(result['batch'], False) + + result = import_wizard.parse_preview({ + 'quoting': '"', + 'separator': ',', + 'headers': True, + 'limit': 5, + }) + self.assertIsNone(result.get('error')) + self.assertIs(result['batch'], True) + + def test_limit_on_lines(self): + """ The limit option should be a limit on the number of *lines* + imported at at time, not the number of *records*. This is relevant + when it comes to embedded o2m. + + A big question is whether we want to round up or down (if the limit + brings us inside a record). Rounding up (aka finishing up the record + we're currently parsing) seems like a better idea: + + * if the first record has so many sub-lines it hits the limit we still + want to import it (it's probably extremely rare but it can happen) + * if we have one line per record, we probably want to import <limit> + records not <limit-1>, but if we stop in the middle of the "current + record" we'd always ignore the last record (I think) + """ + f = io.BytesIO() + writer = pycompat.csv_writer(f, quoting=1) + writer.writerow(['name', 'value/value']) + for record in range(10): + writer.writerow(['record_%d' % record, '0']) + for row in range(1, 10): + writer.writerow(['', str(row)]) + + import_wizard = self.env['base_import.import'].create({ + 'res_model': 'base_import.tests.models.o2m', + 'file_type': 'text/csv', + 'file_name': 'things.csv', + 'file': f.getvalue(), + }) + opts = {'quoting': '"', 'separator': ',', 'headers': True} + preview = import_wizard.parse_preview({**opts, 'limit': 15}) + self.assertIs(preview['batch'], True) + + results = import_wizard.do( + ['name', 'value/value'], [], + {**opts, 'limit': 5} + ) + self.assertFalse(results['messages']) + self.assertEqual(len(results['ids']), 1, "should have imported the first record in full, got %s" % results['ids']) + self.assertEqual(results['nextrow'], 10) + + results = import_wizard.do( + ['name', 'value/value'], [], + {**opts, 'limit': 15} + ) + self.assertFalse(results['messages']) + self.assertEqual(len(results['ids']), 2, "should have importe the first two records, got %s" % results['ids']) + self.assertEqual(results['nextrow'], 20) + + + def test_batches(self): + partners_before = self.env['res.partner'].search([]) + opts = {'headers': True, 'separator': ',', 'quoting': '"'} + + import_wizard = self.env['base_import.import'].create({ + 'res_model': 'res.partner', + 'file_type': 'text/csv', + 'file_name': 'clients.csv', + 'file': b"""name,email +a,a@example.com +b,b@example.com +, +c,c@example.com +d,d@example.com +e,e@example.com +f,f@example.com +g,g@example.com +""" + }) + + results = import_wizard.do(['name', 'email'], [], {**opts, 'limit': 1}) + self.assertFalse(results['messages']) + self.assertEqual(len(results['ids']), 1) + # titlerow is ignored by lastrow's counter + self.assertEqual(results['nextrow'], 1) + partners_1 = self.env['res.partner'].search([]) - partners_before + self.assertEqual(partners_1.name, 'a') + + results = import_wizard.do(['name', 'email'], [], {**opts, 'limit': 2, 'skip': 1}) + self.assertFalse(results['messages']) + self.assertEqual(len(results['ids']), 2) + # empty row should also be ignored + self.assertEqual(results['nextrow'], 3) + partners_2 = self.env['res.partner'].search([]) - (partners_before | partners_1) + self.assertEqual(partners_2.mapped('name'), ['b', 'c']) + + results = import_wizard.do(['name', 'email'], [], {**opts, 'limit': 10, 'skip': 3}) + self.assertFalse(results['messages']) + self.assertEqual(len(results['ids']), 4) + self.assertEqual(results['nextrow'], 0) + partners_3 = self.env['res.partner'].search([]) - (partners_before | partners_1 | partners_2) + self.assertEqual(partners_3.mapped('name'), ['d', 'e', 'f', 'g']) + +class test_failures(TransactionCase): + def test_big_attachments(self): + """ + Ensure big fields (e.g. b64-encoded image data) can be imported and + we're not hitting limits of the default CSV parser config + """ + from PIL import Image + + im = Image.new('RGB', (1920, 1080)) + fout = io.BytesIO() + + writer = pycompat.csv_writer(fout, dialect=None) + writer.writerows([ + [u'name', u'db_datas'], + [u'foo', base64.b64encode(im.tobytes()).decode('ascii')] + ]) + + import_wizard = self.env['base_import.import'].create({ + 'res_model': 'ir.attachment', + 'file': fout.getvalue(), + 'file_type': 'text/csv' + }) + results = import_wizard.do( + ['name', 'db_datas'], + [], + {'headers': True, 'separator': ',', 'quoting': '"'}) + self.assertFalse(results['messages'], "results should be empty on successful import") diff --git a/addons/base_import/tests/test_csv_magic.py b/addons/base_import/tests/test_csv_magic.py new file mode 100644 index 00000000..b7975337 --- /dev/null +++ b/addons/base_import/tests/test_csv_magic.py @@ -0,0 +1,169 @@ +# -*- coding: utf-8 -*- +""" +Tests for various autodetection magics for CSV imports +""" +import codecs + +from odoo.tests import common + +class ImportCase(common.TransactionCase): + def _make_import(self, contents): + return self.env['base_import.import'].create({ + 'res_model': 'base_import.tests.models.complex', + 'file_name': 'f', + 'file_type': 'text/csv', + 'file': contents, + }) + + +class TestEncoding(ImportCase): + """ + create + parse_preview -> check result options + """ + + def _check_text(self, text, encodings, **options): + options.setdefault('quoting', '"') + options.setdefault('separator', '\t') + test_text = "text\tnumber\tdate\tdatetime\n%s\t1.23.45,67\t\t\n" % text + for encoding in ['utf-8', 'utf-16', 'utf-32', *encodings]: + if isinstance(encoding, tuple): + encoding, es = encoding + else: + es = [encoding] + preview = self._make_import( + test_text.encode(encoding)).parse_preview(dict(options)) + + self.assertIsNone(preview.get('error')) + guessed = preview['options']['encoding'] + self.assertIsNotNone(guessed) + self.assertIn( + codecs.lookup(guessed).name, [ + codecs.lookup(e).name + for e in es + ] + ) + + def test_autodetect_encoding(self): + """ Check that import preview can detect & return encoding + """ + self._check_text("Iñtërnâtiônàlizætiøn", [('iso-8859-1', ['iso-8859-1', 'iso-8859-2'])]) + + self._check_text("やぶら小路の藪柑子。海砂利水魚の、食う寝る処に住む処、パイポパイポ パイポのシューリンガン。", ['eucjp', 'shift_jis', 'iso2022_jp']) + + self._check_text("대통령은 제4항과 제5항의 규정에 의하여 확정된 법률을 지체없이 공포하여야 한다, 탄핵의 결정.", ['euc_kr', 'iso2022_kr']) + + # + control in widget + def test_override_detection(self): + """ ensure an explicitly specified encoding is not overridden by the + auto-detection + """ + s = "Iñtërnâtiônàlizætiøn".encode('utf-8') + r = self._make_import(b'text\n' + s)\ + .parse_preview({ + 'quoting': '"', + 'separator': '\t', + 'encoding': 'iso-8859-1', + }) + self.assertIsNone(r.get('error')) + self.assertEqual(r['options']['encoding'], 'iso-8859-1') + self.assertEqual(r['preview'], [['text'], [s.decode('iso-8859-1')]]) + +class TestFileSeparator(ImportCase): + + def setUp(self): + super().setUp() + self.imp = self._make_import( +"""c|f +a|1 +b|2 +c|3 +d|4 +""") + + def test_explicit_success(self): + r = self.imp.parse_preview({ + 'separator': '|', + 'headers': True, + 'quoting': '"', + }) + self.assertIsNone(r.get('error')) + self.assertEqual(r['headers'], ['c', 'f']) + self.assertEqual(r['preview'], [ + ['a', '1'], + ['b', '2'], + ['c', '3'], + ['d', '4'], + ]) + self.assertEqual(r['options']['separator'], '|') + + def test_explicit_fail(self): + """ Don't protect user against making mistakes + """ + r = self.imp.parse_preview({ + 'separator': ',', + 'headers': True, + 'quoting': '"', + }) + self.assertIsNone(r.get('error')) + self.assertEqual(r['headers'], ['c|f']) + self.assertEqual(r['preview'], [ + ['a|1'], + ['b|2'], + ['c|3'], + ['d|4'], + ]) + self.assertEqual(r['options']['separator'], ',') + + def test_guess_ok(self): + r = self.imp.parse_preview({ + 'separator': '', + 'headers': True, + 'quoting': '"', + }) + self.assertIsNone(r.get('error')) + self.assertEqual(r['headers'], ['c', 'f']) + self.assertEqual(r['preview'], [ + ['a', '1'], + ['b', '2'], + ['c', '3'], + ['d', '4'], + ]) + self.assertEqual(r['options']['separator'], '|') + + def test_noguess(self): + """ If the guesser has no idea what the separator is, it defaults to + "," but should not set that value + """ + imp = self._make_import('c\na\nb\nc\nd') + r = imp.parse_preview({ + 'separator': '', + 'headers': True, + 'quoting': '"', + }) + self.assertIsNone(r.get('error')) + self.assertEqual(r['headers'], ['c']) + self.assertEqual(r['preview'], [ + ['a'], + ['b'], + ['c'], + ['d'], + ]) + self.assertEqual(r['options']['separator'], '') + +class TestNumberSeparators(common.TransactionCase): + def test_parse_float(self): + w = self.env['base_import.import'].create({ + 'res_model': 'base_import.tests.models.float', + }) + data = w._parse_import_data( + [ + ['1.62'], ['-1.62'], ['+1.62'], [' +1.62 '], ['(1.62)'], + ["1'234'567,89"], ["1.234.567'89"] + ], + ['value'], {} + ) + self.assertEqual( + [d[0] for d in data], + ['1.62', '-1.62', '+1.62', '+1.62', '-1.62', + '1234567.89', '1234567.89'] + ) |
